1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_ThreadPool.cpp
4
5 Copyright (C)2009-2012 Nintendo Co., Ltd. All rights reserved.
6
7 These coded instructions, statements, and computer programs contain
8 proprietary information of Nintendo of America Inc. and/or Nintendo
9 Company Ltd., and are protected by Federal copyright law. They may
10 not be disclosed to third parties or copied or duplicated in any form,
11 in whole or in part, without the prior written consent of Nintendo.
12
13 $Rev: 46347 $
14 *---------------------------------------------------------------------------*/
15
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/os/os_ErrorHandlerSelect.h>
18 #include <new>
19
20 namespace nn { namespace os {
21
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24 return reinterpret_cast<Thread*>(m_Buffer);
25 }
26
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29 return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34 return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39 return sizeof(Thread) * numThreads
40 + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41 + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46 this->m_Finalizing = false;
47 this->m_WaitingCount = 0;
48 this->m_NumMaxWaitObjects = numMaxWaitObjects;
49 this->m_NumThreads = numThreads;
50 this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51 this->m_WaitEvent.Initialize(false);
52 this->m_ExecuteEvent.Initialize(false);
53 this->m_WaitLock.Initialize();
54 this->m_ExecuteLock.Initialize();
55 }
56
57 namespace {
58 struct StackBufferAdapter
59 {
60 uptr stackBottom;
StackBufferAdapternn::os::__anonad337e860111::StackBufferAdapter61 StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anonad337e860111::StackBufferAdapter62 uptr GetStackBottom() const { return stackBottom; }
63 };
64 }
65
66 // Starts the thread to manage tasks on standby.
StartWaitThread(s32 priority)67 inline void ThreadPool::StartWaitThread(s32 priority)
68 {
69 GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70 m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, priority);
71 }
72
73 // Starts worker thread to run tasks.
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76 Thread* thread = new (GetThreads() + i) Thread();
77 StackBufferAdapter stack(stackBottom);
78 thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 workerPriority,s32 waitPriority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 workerPriority, s32 waitPriority)
82 {
83 // TODO: Recommend NULL check of workBuffer.
84
85 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86 for (size_t i = 0; i < numThreads; i++)
87 {
88 StartExecuteThread(i, stackBottoms[i], workerPriority);
89 }
90 StartWaitThread(waitPriority);
91 }
92
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 workerPriority,s32 waitPriority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 workerPriority, s32 waitPriority)
95 {
96 // TODO: Recommend NULL check of workBuffer.
97
98 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99 for (size_t i = 0; i < numThreads; i++)
100 {
101 StartExecuteThread(i, stacks[i].GetStackBottom(), workerPriority);
102 }
103 StartWaitThread(waitPriority);
104 }
105 #endif // if NN_PLATFORM_HAS_MMU
106
Finalize()107 void ThreadPool::Finalize()
108 {
109 // Stops the task standby thread and worker thread.
110 this->m_Finalizing = true;
111
112 m_WaitEvent.Signal();
113 m_ExecuteEvent.Signal();
114
115 for (size_t i = 0; i < m_NumThreads; i++)
116 {
117 Thread& thread = GetThreads()[i];
118 thread.Join();
119 thread.Finalize();
120 }
121
122 m_WaitThread.Join();
123 m_WaitThread.Finalize();
124 }
125
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128 {
129 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130
131 m_ExecuteQueue.Enqueue(task);
132 }
133
134 // Wakes up the worker thread to run tasks.
135 m_ExecuteEvent.Signal();
136 }
137
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140 {
141 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142
143 m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144 }
145
146 // Wakes up synchronization wait thread.
147 m_WaitEvent.Signal();
148 }
149
150 // Loop processing for the synchronization wait thread.
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153 // The stack size of this function (WAIT_THREAD_STACK_SIZE) is fixed and small.
154 // Take note when revising the code.
155 // Revise the stack size if necessary.
156 while (!m_Finalizing)
157 {
158 {
159 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160
161 // Adds new tasks added to the synchronization task queue to the synchronization wait handle buffer.
162 for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163 {
164 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165 GetWaitTaskBuffer()[m_WaitingCount] = task;
166 // Specifies a value of 1 or greater in the index because a 0 value for the synchronization wait handle buffer index is used to notify that a task has been added.
167 //
168 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169 }
170 }
171
172 // Waits until one of the handles waiting for synchronization enters a signal state.
173 s32 n;
174 NN_OS_ERROR_IF_FAILED(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175 // Does not process when a synchronization task is added to the queue (n == 0).
176 if (n >= 1)
177 {
178 // Move tasks that have completed synchronization wait to the task execution buffer.
179 QueueableTask* task;
180 {
181 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182
183 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186 --m_WaitingCount;
187 }
188 AddToExecuteQueue(task);
189 }
190 }
191 }
192
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195 this_->WaitThreadFunc();
196 }
197
198 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201 while (!m_Finalizing)
202 {
203 // If notification of task execution is received, one of the worker threads on standby runs a task.
204 m_ExecuteEvent.Wait();
205 if (m_Finalizing)
206 {
207 break;
208 }
209 // Locks so that the execution task queue content cannot be read or written while obtaining a task.
210 m_ExecuteLock.Enter();
211 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212 {
213 if (!m_ExecuteQueue.IsEmpty())
214 {
215 m_ExecuteLock.Leave();
216 // If other tasks exist in the execution task queue, another worker thread is awakened and a task is started.
217 m_ExecuteEvent.Signal();
218 }
219 else
220 {
221 m_ExecuteLock.Leave();
222 }
223 // Executes a task.
224 task->Invoke();
225 }
226 else
227 {
228 m_ExecuteLock.Leave();
229 }
230 }
231
232 // Wakes another worker thread, and executes the finalization process.
233 m_ExecuteEvent.Signal();
234 }
235
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238 this_->ExecuteThreadFunc();
239 }
240
241 // -----------------------------------------------------------------------
242
243
244
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247 this->m_ExecuteEvent.Initialize(false);
248 this->m_ExecuteLock.Initialize();
249 StackBufferAdapter stack(stackBottom);
250 m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252
Finalize()253 void SingleThreadPool::Finalize()
254 {
255 // Stops the task standby thread and worker thread.
256 if(!m_Finalizing)
257 {
258 this->m_Finalizing = true;
259 m_ExecuteEvent.Signal();
260 m_WorkerThread.Join();
261 m_WorkerThread.Finalize();
262 }
263 }
264
265 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268 while (!m_Finalizing)
269 {
270 // If notification of task execution is received, one of the worker threads on standby runs a task.
271 m_ExecuteEvent.Wait();
272 if (m_Finalizing)
273 {
274 break;
275 }
276 // Locks so that the execution task queue content cannot be read or written while obtaining a task.
277 m_ExecuteLock.Enter();
278 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279 {
280 if (!m_ExecuteQueue.IsEmpty())
281 {
282 m_ExecuteLock.Leave();
283 // If another task exists in the execution task queue, an event notification is made so that the next execution can be done immediately
284 m_ExecuteEvent.Signal();
285 }
286 else
287 {
288 m_ExecuteLock.Leave();
289 }
290 // Executes a task.
291 task->Invoke();
292 }
293 else
294 {
295 m_ExecuteLock.Leave();
296 }
297 }
298 }
299
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302 this_->ExecuteThreadFunc();
303 }
304
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307 {
308 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309
310 m_ExecuteQueue.Enqueue(task);
311 }
312 // Wakes up the worker thread to run tasks.
313 m_ExecuteEvent.Signal();
314 }
315
316 }} // namespace nn::os
317
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324 new (p) ThreadPoolTaskForC(f, param);
325 }
326
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329 ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330 pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335 new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340 ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341 pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346 ThreadPool* pThreadPool = new (p) ThreadPool();
347 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353 ThreadPool* pThreadPool = new (p) ThreadPool();
354 StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif // if NN_PLATFORM_HAS_MMU
358
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362 pThreadPool->Finalize();
363 }
364
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368 ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369 pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375 ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376 pThreadPool->AddTask(pThreadPoolTask);
377 }
378
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381 return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383
384