1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_ThreadPool.cpp
4
5 Copyright (C)2009 Nintendo Co., Ltd. All rights reserved.
6
7 These coded instructions, statements, and computer programs contain
8 proprietary information of Nintendo of America Inc. and/or Nintendo
9 Company Ltd., and are protected by Federal copyright law. They may
10 not be disclosed to third parties or copied or duplicated in any form,
11 in whole or in part, without the prior written consent of Nintendo.
12
13 $Rev: 29304 $
14 *---------------------------------------------------------------------------*/
15
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/err.h>
18 #include <new>
19
20 namespace nn { namespace os {
21
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24 return reinterpret_cast<Thread*>(m_Buffer);
25 }
26
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29 return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34 return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39 return sizeof(Thread) * numThreads
40 + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41 + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46 this->m_Finalizing = false;
47 this->m_WaitingCount = 0;
48 this->m_NumMaxWaitObjects = numMaxWaitObjects;
49 this->m_NumThreads = numThreads;
50 this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51 this->m_WaitEvent.Initialize(false);
52 this->m_ExecuteEvent.Initialize(false);
53 this->m_WaitLock.Initialize();
54 this->m_ExecuteLock.Initialize();
55 }
56
57 namespace {
58 struct StackBufferAdapter
59 {
60 uptr stackBottom;
StackBufferAdapternn::os::__anon38abe66c0111::StackBufferAdapter61 StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anon38abe66c0111::StackBufferAdapter62 uptr GetStackBottom() const { return stackBottom; }
63 };
64 }
65
66 // 待機タスクを管理するスレッドを開始します。
StartWaitThread()67 inline void ThreadPool::StartWaitThread()
68 {
69 GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70 m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, 0);
71 }
72
73 // タスクを実行するワーカースレッドを開始します。
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76 Thread* thread = new (GetThreads() + i) Thread();
77 StackBufferAdapter stack(stackBottom);
78 thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 priority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 priority)
82 {
83 // TODO: workBuffer の NULL チェックを推奨します。
84
85 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86 for (size_t i = 0; i < numThreads; i++)
87 {
88 StartExecuteThread(i, stackBottoms[i], priority);
89 }
90 StartWaitThread();
91 }
92
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 priority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 priority)
95 {
96 // TODO: workBuffer の NULL チェックを推奨します。
97
98 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99 for (size_t i = 0; i < numThreads; i++)
100 {
101 StartExecuteThread(i, stacks[i].GetStackBottom(), priority);
102 }
103 StartWaitThread();
104 }
105 #endif // if NN_PLATFORM_HAS_MMU
106
Finalize()107 void ThreadPool::Finalize()
108 {
109 // タスク待機スレッド、ワーカースレッドを停止します。
110 this->m_Finalizing = true;
111
112 m_WaitEvent.Signal();
113 m_ExecuteEvent.Signal();
114
115 for (size_t i = 0; i < m_NumThreads; i++)
116 {
117 Thread& thread = GetThreads()[i];
118 thread.Join();
119 thread.Finalize();
120 }
121
122 m_WaitThread.Join();
123 m_WaitThread.Finalize();
124 }
125
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128 {
129 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130
131 m_ExecuteQueue.Enqueue(task);
132 }
133
134 // タスクを実行するワーカースレッドを起床します。
135 m_ExecuteEvent.Signal();
136 }
137
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140 {
141 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142
143 m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144 }
145
146 // 同期待ちスレッドを起床します。
147 m_WaitEvent.Signal();
148 }
149
150 // 同期待ちスレッドでのループ処理です。
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153 // この関数のスタックサイズ(WAIT_THREAD_STACK_SIZE)は固定で小さめに取っている。
154 // コードを修正する際に注意すること。
155 // 必要であれば、スタックサイズを修正する。
156 while (!m_Finalizing)
157 {
158 {
159 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160
161 // 新しく同期タスクキューに追加されたタスクを同期待ちハンドルバッファに追加します。
162 for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163 {
164 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165 GetWaitTaskBuffer()[m_WaitingCount] = task;
166 // 同期待ちハンドルバッファのインデックス 0 はタスクが追加されたことを
167 // 通知するために利用するのでインデックスには 1 以上の値を指定します。
168 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169 }
170 }
171
172 // 同期待ち中のハンドルのいずれかがシグナル状態になるまで待ちます。
173 s32 n;
174 NN_ERR_THROW_FATAL(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175 // キューに同期タスクが追加されたとき (n == 0) は処理は行いません。
176 if (n >= 1)
177 {
178 // 同期待ちが完了したタスクをタスク実行バッファに移します。
179 QueueableTask* task;
180 {
181 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182
183 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186 --m_WaitingCount;
187 }
188 AddToExecuteQueue(task);
189 }
190 }
191 }
192
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195 this_->WaitThreadFunc();
196 }
197
198 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201 while (!m_Finalizing)
202 {
203 // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
204 m_ExecuteEvent.Wait();
205 if (m_Finalizing)
206 {
207 break;
208 }
209 // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
210 m_ExecuteLock.Enter();
211 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212 {
213 if (!m_ExecuteQueue.IsEmpty())
214 {
215 m_ExecuteLock.Leave();
216 // 実行タスクキューに他のタスクが存在すれば、他のワーカースレッドを起床してタスクを実行させます。
217 m_ExecuteEvent.Signal();
218 }
219 else
220 {
221 m_ExecuteLock.Leave();
222 }
223 // タスクを実行します。
224 task->Invoke();
225 }
226 else
227 {
228 m_ExecuteLock.Leave();
229 }
230 }
231
232 // 他のワーカースレッドを起床し、終了処理を実行させます。
233 m_ExecuteEvent.Signal();
234 }
235
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238 this_->ExecuteThreadFunc();
239 }
240
241 // -----------------------------------------------------------------------
242
243
244
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247 this->m_ExecuteEvent.Initialize(false);
248 this->m_ExecuteLock.Initialize();
249 StackBufferAdapter stack(stackBottom);
250 m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252
Finalize()253 void SingleThreadPool::Finalize()
254 {
255 // タスク待機スレッド、ワーカースレッドを停止します。
256 if(!m_Finalizing)
257 {
258 this->m_Finalizing = true;
259 m_ExecuteEvent.Signal();
260 m_WorkerThread.Join();
261 m_WorkerThread.Finalize();
262 }
263 }
264
265 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268 while (!m_Finalizing)
269 {
270 // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
271 m_ExecuteEvent.Wait();
272 if (m_Finalizing)
273 {
274 break;
275 }
276 // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
277 m_ExecuteLock.Enter();
278 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279 {
280 if (!m_ExecuteQueue.IsEmpty())
281 {
282 m_ExecuteLock.Leave();
283 // 実行タスクキューに他のタスクが存在すれば、即座に次を実行できるようにイベントを通知しておく
284 m_ExecuteEvent.Signal();
285 }
286 else
287 {
288 m_ExecuteLock.Leave();
289 }
290 // タスクを実行します。
291 task->Invoke();
292 }
293 else
294 {
295 m_ExecuteLock.Leave();
296 }
297 }
298 }
299
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302 this_->ExecuteThreadFunc();
303 }
304
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307 {
308 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309
310 m_ExecuteQueue.Enqueue(task);
311 }
312 // タスクを実行するワーカースレッドを起床します。
313 m_ExecuteEvent.Signal();
314 }
315
316 }} // namespace nn::os
317
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324 new (p) ThreadPoolTaskForC(f, param);
325 }
326
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329 ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330 pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335 new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340 ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341 pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346 ThreadPool* pThreadPool = new (p) ThreadPool();
347 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353 ThreadPool* pThreadPool = new (p) ThreadPool();
354 StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif // if NN_PLATFORM_HAS_MMU
358
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362 pThreadPool->Finalize();
363 }
364
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368 ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369 pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375 ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376 pThreadPool->AddTask(pThreadPoolTask);
377 }
378
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381 return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383
384