1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_ThreadPool.cpp
4
5 Copyright (C)2009 Nintendo Co., Ltd. All rights reserved.
6
7 These coded instructions, statements, and computer programs contain
8 proprietary information of Nintendo of America Inc. and/or Nintendo
9 Company Ltd., and are protected by Federal copyright law. They may
10 not be disclosed to third parties or copied or duplicated in any form,
11 in whole or in part, without the prior written consent of Nintendo.
12
13 $Rev: 24252 $
14 *---------------------------------------------------------------------------*/
15
16 #include <nn/os/os_ThreadPool.h>
17 #include <new>
18
19 namespace nn { namespace os {
20
GetThreads() const21 inline Thread* ThreadPool::GetThreads() const
22 {
23 return reinterpret_cast<Thread*>(m_Buffer);
24 }
25
GetWaitHandleBuffer() const26 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
27 {
28 return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
29 }
30
GetWaitTaskBuffer() const31 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
32 {
33 return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
34 }
35
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)36 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
37 {
38 return sizeof(Thread) * numThreads
39 + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
40 + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
41 }
42
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)43 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
44 {
45 this->m_Finalizing = false;
46 this->m_WaitingCount = 0;
47 this->m_NumMaxWaitObjects = numMaxWaitObjects;
48 this->m_NumThreads = numThreads;
49 this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
50 this->m_WaitEvent.Initialize(false);
51 this->m_ExecuteEvent.Initialize(false);
52 this->m_WaitLock.Initialize();
53 this->m_ExecuteLock.Initialize();
54 }
55
56 namespace {
57 struct StackBufferAdapter
58 {
59 uptr stackBottom;
StackBufferAdapternn::os::__anonbd42df890111::StackBufferAdapter60 StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anonbd42df890111::StackBufferAdapter61 uptr GetStackBottom() const { return stackBottom; }
62 };
63 }
64
65 // 待機タスクを管理するスレッドを開始します。
StartWaitThread()66 inline void ThreadPool::StartWaitThread()
67 {
68 GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
69 m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, 0);
70 }
71
72 // タスクを実行するワーカースレッドを開始します。
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)73 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
74 {
75 Thread* thread = new (GetThreads() + i) Thread();
76 StackBufferAdapter stack(stackBottom);
77 thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
78 }
79
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 priority)80 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 priority)
81 {
82 // TODO: workBuffer の NULL チェックを推奨します。
83
84 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
85 for (size_t i = 0; i < numThreads; i++)
86 {
87 StartExecuteThread(i, stackBottoms[i], priority);
88 }
89 StartWaitThread();
90 }
91
92 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 priority)93 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 priority)
94 {
95 // TODO: workBuffer の NULL チェックを推奨します。
96
97 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
98 for (size_t i = 0; i < numThreads; i++)
99 {
100 StartExecuteThread(i, stacks[i].GetStackBottom(), priority);
101 }
102 StartWaitThread();
103 }
104 #endif // if NN_PLATFORM_HAS_MMU
105
Finalize()106 void ThreadPool::Finalize()
107 {
108 // タスク待機スレッド、ワーカースレッドを停止します。
109 this->m_Finalizing = true;
110
111 m_WaitEvent.Signal();
112 m_ExecuteEvent.Signal();
113
114 for (size_t i = 0; i < m_NumThreads; i++)
115 {
116 Thread& thread = GetThreads()[i];
117 thread.Join();
118 thread.Finalize();
119 }
120
121 m_WaitThread.Join();
122 m_WaitThread.Finalize();
123 }
124
AddToExecuteQueue(QueueableTask * task)125 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
126 {
127 {
128 nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
129
130 m_ExecuteQueue.Enqueue(task);
131 }
132
133 // タスクを実行するワーカースレッドを起床します。
134 m_ExecuteEvent.Signal();
135 }
136
AddToWaitQueue(QueueableWaitTask * task)137 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
138 {
139 {
140 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
141
142 m_WaitQueue.Enqueue(task->AsNonWaitableTask());
143 }
144
145 // 同期待ちスレッドを起床します。
146 m_WaitEvent.Signal();
147 }
148
149 // 同期待ちスレッドでのループ処理です。
WaitThreadFunc()150 inline void ThreadPool::WaitThreadFunc()
151 {
152 // この関数のスタックサイズ(WAIT_THREAD_STACK_SIZE)は固定で小さめに取っている。
153 // コードを修正する際に注意すること。
154 // 必要であれば、スタックサイズを修正する。
155 while (!m_Finalizing)
156 {
157 {
158 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
159
160 // 新しく同期タスクキューに追加されたタスクを同期待ちハンドルバッファに追加します。
161 for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
162 {
163 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
164 GetWaitTaskBuffer()[m_WaitingCount] = task;
165 // 同期待ちハンドルバッファのインデックス 0 はタスクが追加されたことを
166 // 通知するために利用するのでインデックスには 1 以上の値を指定します。
167 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
168 }
169 }
170
171 // 同期待ち中のハンドルのいずれかがシグナル状態になるまで待ちます。
172 s32 n;
173 NN_UTIL_PANIC_IF_FAILED(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
174 // キューに同期タスクが追加されたとき (n == 0) は処理は行いません。
175 if (n >= 1)
176 {
177 // 同期待ちが完了したタスクをタスク実行バッファに移します。
178 QueueableTask* task;
179 {
180 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
181
182 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
183 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
184 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
185 --m_WaitingCount;
186 }
187 AddToExecuteQueue(task);
188 }
189 }
190 }
191
WaitThreadFunc(ThreadPool * this_)192 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
193 {
194 this_->WaitThreadFunc();
195 }
196
197 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()198 inline void ThreadPool::ExecuteThreadFunc()
199 {
200 while (!m_Finalizing)
201 {
202 // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
203 m_ExecuteEvent.Wait();
204 if (m_Finalizing)
205 {
206 break;
207 }
208 // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
209 m_ExecuteLock.Enter();
210 if (QueueableTask* task = m_ExecuteQueue.Dequeue())
211 {
212 if (!m_ExecuteQueue.IsEmpty())
213 {
214 m_ExecuteLock.Leave();
215 // 実行タスクキューに他のタスクが存在すれば、他のワーカースレッドを起床してタスクを実行させます。
216 m_ExecuteEvent.Signal();
217 }
218 else
219 {
220 m_ExecuteLock.Leave();
221 }
222 // タスクを実行します。
223 task->Invoke();
224 }
225 else
226 {
227 m_ExecuteLock.Leave();
228 }
229 }
230
231 // 他のワーカースレッドを起床し、終了処理を実行させます。
232 m_ExecuteEvent.Signal();
233 }
234
ExecuteThreadFunc(ThreadPool * this_)235 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
236 {
237 this_->ExecuteThreadFunc();
238 }
239
240 }}
241
242
243 #include <new>
244 using namespace nn::os;
245 using namespace nn::os::detail;
246
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)247 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
248 {
249 new (p) ThreadPoolTaskForC(f, param);
250 }
251
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)252 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
253 {
254 ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
255 pThreadPoolTaskForC->~ThreadPoolTaskForC();
256 }
257
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)258 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
259 {
260 new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
261 }
262
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)263 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
264 {
265 ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
266 pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
267 }
268
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)269 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
270 {
271 ThreadPool* pThreadPool = new (p) ThreadPool();
272 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
273 }
274
275 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)276 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
277 {
278 ThreadPool* pThreadPool = new (p) ThreadPool();
279 StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
280 pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
281 }
282 #endif // if NN_PLATFORM_HAS_MMU
283
nnosThreadPoolFinalize(nnosThreadPool * p)284 void nnosThreadPoolFinalize(nnosThreadPool* p)
285 {
286 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
287 pThreadPool->Finalize();
288 }
289
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)290 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
291 {
292 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
293 ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
294 pThreadPool->AddWaitTask(pThreadPoolTask);
295 }
296
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)297 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
298 {
299 ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
300 ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
301 pThreadPool->AddTask(pThreadPoolTask);
302 }
303
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)304 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
305 {
306 return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
307 }
308
309