1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_ThreadPool.cpp
4 
5   Copyright (C)2009 Nintendo Co., Ltd.  All rights reserved.
6 
7   These coded instructions, statements, and computer programs contain
8   proprietary information of Nintendo of America Inc. and/or Nintendo
9   Company Ltd., and are protected by Federal copyright law.  They may
10   not be disclosed to third parties or copied or duplicated in any form,
11   in whole or in part, without the prior written consent of Nintendo.
12 
13   $Rev: 29304 $
14  *---------------------------------------------------------------------------*/
15 
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/err.h>
18 #include <new>
19 
20 namespace nn { namespace os {
21 
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24     return reinterpret_cast<Thread*>(m_Buffer);
25 }
26 
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29     return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31 
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34     return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36 
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39     return sizeof(Thread) * numThreads
40          + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41          + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43 
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46     this->m_Finalizing = false;
47     this->m_WaitingCount = 0;
48     this->m_NumMaxWaitObjects = numMaxWaitObjects;
49     this->m_NumThreads = numThreads;
50     this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51     this->m_WaitEvent.Initialize(false);
52     this->m_ExecuteEvent.Initialize(false);
53     this->m_WaitLock.Initialize();
54     this->m_ExecuteLock.Initialize();
55 }
56 
57 namespace {
58     struct StackBufferAdapter
59     {
60         uptr stackBottom;
StackBufferAdapternn::os::__anon38abe66c0111::StackBufferAdapter61         StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anon38abe66c0111::StackBufferAdapter62         uptr GetStackBottom() const { return stackBottom; }
63     };
64 }
65 
66 // 待機タスクを管理するスレッドを開始します。
StartWaitThread()67 inline void ThreadPool::StartWaitThread()
68 {
69     GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70     m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, 0);
71 }
72 
73 // タスクを実行するワーカースレッドを開始します。
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76     Thread* thread = new (GetThreads() + i) Thread();
77     StackBufferAdapter stack(stackBottom);
78     thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80 
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 priority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 priority)
82 {
83     // TODO: workBuffer の NULL チェックを推奨します。
84 
85     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86     for (size_t i = 0; i < numThreads; i++)
87     {
88         StartExecuteThread(i, stackBottoms[i], priority);
89     }
90     StartWaitThread();
91 }
92 
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 priority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 priority)
95 {
96     // TODO: workBuffer の NULL チェックを推奨します。
97 
98     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99     for (size_t i = 0; i < numThreads; i++)
100     {
101         StartExecuteThread(i, stacks[i].GetStackBottom(), priority);
102     }
103     StartWaitThread();
104 }
105 #endif  // if NN_PLATFORM_HAS_MMU
106 
Finalize()107 void ThreadPool::Finalize()
108 {
109     // タスク待機スレッド、ワーカースレッドを停止します。
110     this->m_Finalizing = true;
111 
112     m_WaitEvent.Signal();
113     m_ExecuteEvent.Signal();
114 
115     for (size_t i = 0; i < m_NumThreads; i++)
116     {
117         Thread& thread = GetThreads()[i];
118         thread.Join();
119         thread.Finalize();
120     }
121 
122     m_WaitThread.Join();
123     m_WaitThread.Finalize();
124 }
125 
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128     {
129         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130 
131         m_ExecuteQueue.Enqueue(task);
132     }
133 
134     // タスクを実行するワーカースレッドを起床します。
135     m_ExecuteEvent.Signal();
136 }
137 
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140     {
141         nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142 
143         m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144     }
145 
146     // 同期待ちスレッドを起床します。
147     m_WaitEvent.Signal();
148 }
149 
150 // 同期待ちスレッドでのループ処理です。
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153     // この関数のスタックサイズ(WAIT_THREAD_STACK_SIZE)は固定で小さめに取っている。
154     // コードを修正する際に注意すること。
155     // 必要であれば、スタックサイズを修正する。
156     while (!m_Finalizing)
157     {
158         {
159             nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160 
161             // 新しく同期タスクキューに追加されたタスクを同期待ちハンドルバッファに追加します。
162             for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163             {
164                 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165                 GetWaitTaskBuffer()[m_WaitingCount] = task;
166                 // 同期待ちハンドルバッファのインデックス 0 はタスクが追加されたことを
167                 // 通知するために利用するのでインデックスには 1 以上の値を指定します。
168                 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169             }
170         }
171 
172         // 同期待ち中のハンドルのいずれかがシグナル状態になるまで待ちます。
173         s32 n;
174         NN_ERR_THROW_FATAL(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175         // キューに同期タスクが追加されたとき (n == 0) は処理は行いません。
176         if (n >= 1)
177         {
178             // 同期待ちが完了したタスクをタスク実行バッファに移します。
179             QueueableTask* task;
180             {
181                 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182 
183                 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184                 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185                 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186                 --m_WaitingCount;
187             }
188             AddToExecuteQueue(task);
189         }
190     }
191 }
192 
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195     this_->WaitThreadFunc();
196 }
197 
198 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201     while (!m_Finalizing)
202     {
203         // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
204         m_ExecuteEvent.Wait();
205         if (m_Finalizing)
206         {
207             break;
208         }
209         // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
210         m_ExecuteLock.Enter();
211         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212         {
213             if (!m_ExecuteQueue.IsEmpty())
214             {
215                 m_ExecuteLock.Leave();
216                 // 実行タスクキューに他のタスクが存在すれば、他のワーカースレッドを起床してタスクを実行させます。
217                 m_ExecuteEvent.Signal();
218             }
219             else
220             {
221                 m_ExecuteLock.Leave();
222             }
223             // タスクを実行します。
224             task->Invoke();
225         }
226         else
227         {
228             m_ExecuteLock.Leave();
229         }
230     }
231 
232     // 他のワーカースレッドを起床し、終了処理を実行させます。
233     m_ExecuteEvent.Signal();
234 }
235 
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238     this_->ExecuteThreadFunc();
239 }
240 
241 // -----------------------------------------------------------------------
242 
243 
244 
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247     this->m_ExecuteEvent.Initialize(false);
248     this->m_ExecuteLock.Initialize();
249     StackBufferAdapter stack(stackBottom);
250     m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252 
Finalize()253 void SingleThreadPool::Finalize()
254 {
255     // タスク待機スレッド、ワーカースレッドを停止します。
256     if(!m_Finalizing)
257     {
258         this->m_Finalizing = true;
259         m_ExecuteEvent.Signal();
260         m_WorkerThread.Join();
261         m_WorkerThread.Finalize();
262     }
263 }
264 
265 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268     while (!m_Finalizing)
269     {
270         // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
271         m_ExecuteEvent.Wait();
272         if (m_Finalizing)
273         {
274             break;
275         }
276         // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
277         m_ExecuteLock.Enter();
278         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279         {
280             if (!m_ExecuteQueue.IsEmpty())
281             {
282                 m_ExecuteLock.Leave();
283                 // 実行タスクキューに他のタスクが存在すれば、即座に次を実行できるようにイベントを通知しておく
284                 m_ExecuteEvent.Signal();
285             }
286             else
287             {
288                 m_ExecuteLock.Leave();
289             }
290             // タスクを実行します。
291             task->Invoke();
292         }
293         else
294         {
295             m_ExecuteLock.Leave();
296         }
297     }
298 }
299 
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302     this_->ExecuteThreadFunc();
303 }
304 
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307     {
308         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309 
310         m_ExecuteQueue.Enqueue(task);
311     }
312     // タスクを実行するワーカースレッドを起床します。
313     m_ExecuteEvent.Signal();
314 }
315 
316 }} // namespace nn::os
317 
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321 
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324     new (p) ThreadPoolTaskForC(f, param);
325 }
326 
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329     ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330     pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332 
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335     new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337 
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340     ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341     pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343 
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346     ThreadPool* pThreadPool = new (p) ThreadPool();
347     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349 
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353     ThreadPool* pThreadPool = new (p) ThreadPool();
354     StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif  // if NN_PLATFORM_HAS_MMU
358 
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362     pThreadPool->Finalize();
363 }
364 
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368     ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369     pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371 
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375     ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376     pThreadPool->AddTask(pThreadPoolTask);
377 }
378 
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381     return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383 
384