1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_ThreadPool.cpp
4 Copyright (C)2009 Nintendo Co., Ltd. All rights reserved.
5 These coded instructions, statements, and computer programs contain
6 proprietary information of Nintendo of America Inc. and/or Nintendo
7 Company Ltd., and are protected by Federal copyright law. They may
8 not be disclosed to third parties or copied or duplicated in any form,
9 in whole or in part, without the prior written consent of Nintendo.
10 $Rev: 34337 $
11 *---------------------------------------------------------------------------
12
13
14 */
15
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/err.h>
18 #include <new>
19
20 namespace nn { namespace os {
21
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24 return reinterpret_cast<Thread*>(m_Buffer);
25 }
26
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29 return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34 return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39 return sizeof(Thread) * numThreads
40 + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41 + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46 this->m_Finalizing = false;
47 this->m_WaitingCount = 0;
48 this->m_NumMaxWaitObjects = numMaxWaitObjects;
49 this->m_NumThreads = numThreads;
50 this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51 this->m_WaitEvent.Initialize(false);
52 this->m_ExecuteEvent.Initialize(false);
53 this->m_WaitLock.Initialize();
54 this->m_ExecuteLock.Initialize();
55 }
56
57 namespace {
58 struct StackBufferAdapter
59 {
60 uptr stackBottom;
StackBufferAdapternn::os::__anon6efd3d550111::StackBufferAdapter61 StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anon6efd3d550111::StackBufferAdapter62 uptr GetStackBottom() const { return stackBottom; }
63 };
64 }
65
66 // Starts the thread to manage tasks on standby.
StartWaitThread(s32 priority)67 inline void ThreadPool::StartWaitThread(s32 priority)
68 {
69 GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70 m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, priority);
71 }
72
73 // Starts worker thread to execute tasks.
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76 Thread* thread = new (GetThreads() + i) Thread();
77 StackBufferAdapter stack(stackBottom);
78 thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 workerPriority,s32 waitPriority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 workerPriority, s32 waitPriority)
82 {
83 // TODO: Recommend NULL check of workBuffer.
84
85 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86 for (size_t i = 0; i < numThreads; i++)
87 {
88 StartExecuteThread(i, stackBottoms[i], workerPriority);
89 }
90 StartWaitThread(waitPriority);
91 }
92
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 workerPriority,s32 waitPriority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 workerPriority, s32 waitPriority)
95 {
96 // TODO: Recommend NULL check of workBuffer.
97
98 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99 for (size_t i = 0; i < numThreads; i++)
100 {
101 StartExecuteThread(i, stacks[i].GetStackBottom(), workerPriority);
102 }
103 StartWaitThread(waitPriority);
104 }
105 #endif // if NN_PLATFORM_HAS_MMU
106
Finalize()107 void ThreadPool::Finalize()
108 {
109 // Stops the task standby thread and worker thread.
110 this->m_Finalizing = true;
111
112 m_WaitEvent.Signal();
113 m_ExecuteEvent.Signal();
114
115 for (size_t i = 0; i < m_NumThreads; i++)
116 {
117 Thread& thread = GetThreads()[i];
118 thread.Join();
119 thread.Finalize();
120 }
121
122 m_WaitThread.Join();
123 m_WaitThread.Finalize();
124 }
125
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128 {
129 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130
131 m_ExecuteQueue.Enqueue(task);
132 }
133
134 // Wakes up the worker thread to execute tasks.
135 m_ExecuteEvent.Signal();
136 }
137
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140 {
141 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142
143 m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144 }
145
146 // Wakes up synchronization wait thread.
147 m_WaitEvent.Signal();
148 }
149
150 // Loop processing for the synchronization wait thread.
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153 // The stack size of this function (WAIT_THREAD_STACK_SIZE) is fixed and small.
154 // Take note when revising the code.
155 // Revise the stack size if necessary.
156 while (!m_Finalizing)
157 {
158 {
159 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160
161 // Adds new tasks added to the synchronization task queue to the synchronization wait handle buffer.
162 for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163 {
164 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165 GetWaitTaskBuffer()[m_WaitingCount] = task;
166 // Specifies a value of 1 or greater in the index because a 0 value for the synchronization wait handle buffer index is used to notify that a task has been added.
167 //
168 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169 }
170 }
171
172 // Waits until one of the handles waiting for synchronization enters a signal state.
173 s32 n;
174 NN_ERR_THROW_FATAL(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175 // Does not process when a synchronization task is added to the queue (n == 0).
176 if (n >= 1)
177 {
178 // Move tasks that have completed synchronization wait to the task execution buffer.
179 QueueableTask* task;
180 {
181 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182
183 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186 --m_WaitingCount;
187 }
188 AddToExecuteQueue(task);
189 }
190 }
191 }
192
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195 this_->WaitThreadFunc();
196 }
197
198 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201 while (!m_Finalizing)
202 {
203 // If notification of task execution is received, one of the worker threads on standby executes a task.
204 m_ExecuteEvent.Wait();
205 if (m_Finalizing)
206 {
207 break;
208 }
209 // Locks so that the execution task queue content cannot be read or written while obtaining a task.
210 m_ExecuteLock.Enter();
211 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212 {
213 if (!m_ExecuteQueue.IsEmpty())
214 {
215 m_ExecuteLock.Leave();
216 // If other tasks exist in the execution task queue, another worker thread is awakened and a task is executed.
217 m_ExecuteEvent.Signal();
218 }
219 else
220 {
221 m_ExecuteLock.Leave();
222 }
223 // Executes a task.
224 task->Invoke();
225 }
226 else
227 {
228 m_ExecuteLock.Leave();
229 }
230 }
231
232 // Wakes another worker thread, and executes the finalization process.
233 m_ExecuteEvent.Signal();
234 }
235
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238 this_->ExecuteThreadFunc();
239 }
240
241 // -----------------------------------------------------------------------
242
243
244
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247 this->m_ExecuteEvent.Initialize(false);
248 this->m_ExecuteLock.Initialize();
249 StackBufferAdapter stack(stackBottom);
250 m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252
Finalize()253 void SingleThreadPool::Finalize()
254 {
255 // Stops the task standby thread and worker thread.
256 if(!m_Finalizing)
257 {
258 this->m_Finalizing = true;
259 m_ExecuteEvent.Signal();
260 m_WorkerThread.Join();
261 m_WorkerThread.Finalize();
262 }
263 }
264
265 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268 while (!m_Finalizing)
269 {
270 // If notification of task execution is received, one of the worker threads on standby executes a task.
271 m_ExecuteEvent.Wait();
272 if (m_Finalizing)
273 {
274 break;
275 }
276 // Locks so that the execution task queue content cannot be read or written while obtaining a task.
277 m_ExecuteLock.Enter();
278 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279 {
280 if (!m_ExecuteQueue.IsEmpty())
281 {
282 m_ExecuteLock.Leave();
283 // If another task exists in the execution task queue, an event notification is made so that the next execution can be done immediately
284 m_ExecuteEvent.Signal();
285 }
286 else
287 {
288 m_ExecuteLock.Leave();
289 }
290 // Executes a task.
291 task->Invoke();
292 }
293 else
294 {
295 m_ExecuteLock.Leave();
296 }
297 }
298 }
299
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302 this_->ExecuteThreadFunc();
303 }
304
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307 {
308 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309
310 m_ExecuteQueue.Enqueue(task);
311 }
312 // Wakes up the worker thread to execute tasks.
313 m_ExecuteEvent.Signal();
314 }
315
316 }} // namespace nn::os
317
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324 new (p) ThreadPoolTaskForC(f, param);
325 }
326
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329 ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330 pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335 new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340 ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341 pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346 ThreadPool* pThreadPool = new (p) ThreadPool();
347 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353 ThreadPool* pThreadPool = new (p) ThreadPool();
354 StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif // if NN_PLATFORM_HAS_MMU
358
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362 pThreadPool->Finalize();
363 }
364
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368 ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369 pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375 ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376 pThreadPool->AddTask(pThreadPoolTask);
377 }
378
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381 return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383
384