1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_ThreadPool.cpp
4 
5   Copyright (C)2009 Nintendo Co., Ltd.  All rights reserved.
6 
7   These coded instructions, statements, and computer programs contain
8   proprietary information of Nintendo of America Inc. and/or Nintendo
9   Company Ltd., and are protected by Federal copyright law.  They may
10   not be disclosed to third parties or copied or duplicated in any form,
11   in whole or in part, without the prior written consent of Nintendo.
12 
13   $Rev: 24252 $
14  *---------------------------------------------------------------------------*/
15 
16 #include <nn/os/os_ThreadPool.h>
17 #include <new>
18 
19 namespace nn { namespace os {
20 
GetThreads() const21 inline Thread* ThreadPool::GetThreads() const
22 {
23     return reinterpret_cast<Thread*>(m_Buffer);
24 }
25 
GetWaitHandleBuffer() const26 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
27 {
28     return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
29 }
30 
GetWaitTaskBuffer() const31 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
32 {
33     return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
34 }
35 
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)36 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
37 {
38     return sizeof(Thread) * numThreads
39          + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
40          + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
41 }
42 
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)43 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
44 {
45     this->m_Finalizing = false;
46     this->m_WaitingCount = 0;
47     this->m_NumMaxWaitObjects = numMaxWaitObjects;
48     this->m_NumThreads = numThreads;
49     this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
50     this->m_WaitEvent.Initialize(false);
51     this->m_ExecuteEvent.Initialize(false);
52     this->m_WaitLock.Initialize();
53     this->m_ExecuteLock.Initialize();
54 }
55 
56 namespace {
57     struct StackBufferAdapter
58     {
59         uptr stackBottom;
StackBufferAdapternn::os::__anonbd42df890111::StackBufferAdapter60         StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anonbd42df890111::StackBufferAdapter61         uptr GetStackBottom() const { return stackBottom; }
62     };
63 }
64 
65 // 待機タスクを管理するスレッドを開始します。
StartWaitThread()66 inline void ThreadPool::StartWaitThread()
67 {
68     GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
69     m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, 0);
70 }
71 
72 // タスクを実行するワーカースレッドを開始します。
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)73 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
74 {
75     Thread* thread = new (GetThreads() + i) Thread();
76     StackBufferAdapter stack(stackBottom);
77     thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
78 }
79 
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 priority)80 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 priority)
81 {
82     // TODO: workBuffer の NULL チェックを推奨します。
83 
84     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
85     for (size_t i = 0; i < numThreads; i++)
86     {
87         StartExecuteThread(i, stackBottoms[i], priority);
88     }
89     StartWaitThread();
90 }
91 
92 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 priority)93 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 priority)
94 {
95     // TODO: workBuffer の NULL チェックを推奨します。
96 
97     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
98     for (size_t i = 0; i < numThreads; i++)
99     {
100         StartExecuteThread(i, stacks[i].GetStackBottom(), priority);
101     }
102     StartWaitThread();
103 }
104 #endif  // if NN_PLATFORM_HAS_MMU
105 
Finalize()106 void ThreadPool::Finalize()
107 {
108     // タスク待機スレッド、ワーカースレッドを停止します。
109     this->m_Finalizing = true;
110 
111     m_WaitEvent.Signal();
112     m_ExecuteEvent.Signal();
113 
114     for (size_t i = 0; i < m_NumThreads; i++)
115     {
116         Thread& thread = GetThreads()[i];
117         thread.Join();
118         thread.Finalize();
119     }
120 
121     m_WaitThread.Join();
122     m_WaitThread.Finalize();
123 }
124 
AddToExecuteQueue(QueueableTask * task)125 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
126 {
127     {
128         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
129 
130         m_ExecuteQueue.Enqueue(task);
131     }
132 
133     // タスクを実行するワーカースレッドを起床します。
134     m_ExecuteEvent.Signal();
135 }
136 
AddToWaitQueue(QueueableWaitTask * task)137 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
138 {
139     {
140         nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
141 
142         m_WaitQueue.Enqueue(task->AsNonWaitableTask());
143     }
144 
145     // 同期待ちスレッドを起床します。
146     m_WaitEvent.Signal();
147 }
148 
149 // 同期待ちスレッドでのループ処理です。
WaitThreadFunc()150 inline void ThreadPool::WaitThreadFunc()
151 {
152     // この関数のスタックサイズ(WAIT_THREAD_STACK_SIZE)は固定で小さめに取っている。
153     // コードを修正する際に注意すること。
154     // 必要であれば、スタックサイズを修正する。
155     while (!m_Finalizing)
156     {
157         {
158             nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
159 
160             // 新しく同期タスクキューに追加されたタスクを同期待ちハンドルバッファに追加します。
161             for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
162             {
163                 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
164                 GetWaitTaskBuffer()[m_WaitingCount] = task;
165                 // 同期待ちハンドルバッファのインデックス 0 はタスクが追加されたことを
166                 // 通知するために利用するのでインデックスには 1 以上の値を指定します。
167                 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
168             }
169         }
170 
171         // 同期待ち中のハンドルのいずれかがシグナル状態になるまで待ちます。
172         s32 n;
173         NN_UTIL_PANIC_IF_FAILED(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
174         // キューに同期タスクが追加されたとき (n == 0) は処理は行いません。
175         if (n >= 1)
176         {
177             // 同期待ちが完了したタスクをタスク実行バッファに移します。
178             QueueableTask* task;
179             {
180                 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
181 
182                 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
183                 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
184                 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
185                 --m_WaitingCount;
186             }
187             AddToExecuteQueue(task);
188         }
189     }
190 }
191 
WaitThreadFunc(ThreadPool * this_)192 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
193 {
194     this_->WaitThreadFunc();
195 }
196 
197 // タスク実行ワーカースレッドのループ処理です。
ExecuteThreadFunc()198 inline void ThreadPool::ExecuteThreadFunc()
199 {
200     while (!m_Finalizing)
201     {
202         // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。
203         m_ExecuteEvent.Wait();
204         if (m_Finalizing)
205         {
206             break;
207         }
208         // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。
209         m_ExecuteLock.Enter();
210         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
211         {
212             if (!m_ExecuteQueue.IsEmpty())
213             {
214                 m_ExecuteLock.Leave();
215                 // 実行タスクキューに他のタスクが存在すれば、他のワーカースレッドを起床してタスクを実行させます。
216                 m_ExecuteEvent.Signal();
217             }
218             else
219             {
220                 m_ExecuteLock.Leave();
221             }
222             // タスクを実行します。
223             task->Invoke();
224         }
225         else
226         {
227             m_ExecuteLock.Leave();
228         }
229     }
230 
231     // 他のワーカースレッドを起床し、終了処理を実行させます。
232     m_ExecuteEvent.Signal();
233 }
234 
ExecuteThreadFunc(ThreadPool * this_)235 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
236 {
237     this_->ExecuteThreadFunc();
238 }
239 
240 }}
241 
242 
243 #include <new>
244 using namespace nn::os;
245 using namespace nn::os::detail;
246 
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)247 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
248 {
249     new (p) ThreadPoolTaskForC(f, param);
250 }
251 
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)252 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
253 {
254     ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
255     pThreadPoolTaskForC->~ThreadPoolTaskForC();
256 }
257 
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)258 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
259 {
260     new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
261 }
262 
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)263 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
264 {
265     ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
266     pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
267 }
268 
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)269 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
270 {
271     ThreadPool* pThreadPool = new (p) ThreadPool();
272     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
273 }
274 
275 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)276 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
277 {
278     ThreadPool* pThreadPool = new (p) ThreadPool();
279     StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
280     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
281 }
282 #endif  // if NN_PLATFORM_HAS_MMU
283 
nnosThreadPoolFinalize(nnosThreadPool * p)284 void nnosThreadPoolFinalize(nnosThreadPool* p)
285 {
286     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
287     pThreadPool->Finalize();
288 }
289 
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)290 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
291 {
292     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
293     ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
294     pThreadPool->AddWaitTask(pThreadPoolTask);
295 }
296 
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)297 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
298 {
299     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
300     ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
301     pThreadPool->AddTask(pThreadPoolTask);
302 }
303 
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)304 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
305 {
306     return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
307 }
308 
309