1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_ThreadPool.cpp
4 
5   Copyright (C)2009-2012 Nintendo Co., Ltd.  All rights reserved.
6 
7   These coded instructions, statements, and computer programs contain
8   proprietary information of Nintendo of America Inc. and/or Nintendo
9   Company Ltd., and are protected by Federal copyright law.  They may
10   not be disclosed to third parties or copied or duplicated in any form,
11   in whole or in part, without the prior written consent of Nintendo.
12 
13   $Rev: 46347 $
14  *---------------------------------------------------------------------------*/
15 
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/os/os_ErrorHandlerSelect.h>
18 #include <new>
19 
20 namespace nn { namespace os {
21 
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24     return reinterpret_cast<Thread*>(m_Buffer);
25 }
26 
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29     return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31 
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34     return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36 
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39     return sizeof(Thread) * numThreads
40          + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41          + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43 
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46     this->m_Finalizing = false;
47     this->m_WaitingCount = 0;
48     this->m_NumMaxWaitObjects = numMaxWaitObjects;
49     this->m_NumThreads = numThreads;
50     this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51     this->m_WaitEvent.Initialize(false);
52     this->m_ExecuteEvent.Initialize(false);
53     this->m_WaitLock.Initialize();
54     this->m_ExecuteLock.Initialize();
55 }
56 
57 namespace {
58     struct StackBufferAdapter
59     {
60         uptr stackBottom;
StackBufferAdapternn::os::__anonad337e860111::StackBufferAdapter61         StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anonad337e860111::StackBufferAdapter62         uptr GetStackBottom() const { return stackBottom; }
63     };
64 }
65 
66 // Starts the thread to manage tasks on standby.
StartWaitThread(s32 priority)67 inline void ThreadPool::StartWaitThread(s32 priority)
68 {
69     GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70     m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, priority);
71 }
72 
73 // Starts worker thread to run tasks.
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76     Thread* thread = new (GetThreads() + i) Thread();
77     StackBufferAdapter stack(stackBottom);
78     thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80 
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 workerPriority,s32 waitPriority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 workerPriority, s32 waitPriority)
82 {
83     // TODO: Recommend NULL check of workBuffer.
84 
85     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86     for (size_t i = 0; i < numThreads; i++)
87     {
88         StartExecuteThread(i, stackBottoms[i], workerPriority);
89     }
90     StartWaitThread(waitPriority);
91 }
92 
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 workerPriority,s32 waitPriority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 workerPriority, s32 waitPriority)
95 {
96     // TODO: Recommend NULL check of workBuffer.
97 
98     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99     for (size_t i = 0; i < numThreads; i++)
100     {
101         StartExecuteThread(i, stacks[i].GetStackBottom(), workerPriority);
102     }
103     StartWaitThread(waitPriority);
104 }
105 #endif  // if NN_PLATFORM_HAS_MMU
106 
Finalize()107 void ThreadPool::Finalize()
108 {
109     // Stops the task standby thread and worker thread.
110     this->m_Finalizing = true;
111 
112     m_WaitEvent.Signal();
113     m_ExecuteEvent.Signal();
114 
115     for (size_t i = 0; i < m_NumThreads; i++)
116     {
117         Thread& thread = GetThreads()[i];
118         thread.Join();
119         thread.Finalize();
120     }
121 
122     m_WaitThread.Join();
123     m_WaitThread.Finalize();
124 }
125 
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128     {
129         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130 
131         m_ExecuteQueue.Enqueue(task);
132     }
133 
134     // Wakes up the worker thread to run tasks.
135     m_ExecuteEvent.Signal();
136 }
137 
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140     {
141         nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142 
143         m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144     }
145 
146     // Wakes up synchronization wait thread.
147     m_WaitEvent.Signal();
148 }
149 
150 // Loop processing for the synchronization wait thread.
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153     // The stack size of this function (WAIT_THREAD_STACK_SIZE) is fixed and small.
154     // Take note when revising the code.
155     // Revise the stack size if necessary.
156     while (!m_Finalizing)
157     {
158         {
159             nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160 
161             // Adds new tasks added to the synchronization task queue to the synchronization wait handle buffer.
162             for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163             {
164                 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165                 GetWaitTaskBuffer()[m_WaitingCount] = task;
166                 // Specifies a value of 1 or greater in the index because a 0 value for the synchronization wait handle buffer index is used to notify that a task has been added.
167                 //
168                 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169             }
170         }
171 
172         // Waits until one of the handles waiting for synchronization enters a signal state.
173         s32 n;
174         NN_OS_ERROR_IF_FAILED(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175         // Does not process when a synchronization task is added to the queue (n == 0).
176         if (n >= 1)
177         {
178             // Move tasks that have completed synchronization wait to the task execution buffer.
179             QueueableTask* task;
180             {
181                 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182 
183                 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184                 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185                 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186                 --m_WaitingCount;
187             }
188             AddToExecuteQueue(task);
189         }
190     }
191 }
192 
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195     this_->WaitThreadFunc();
196 }
197 
198 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201     while (!m_Finalizing)
202     {
203         // If notification of task execution is received, one of the worker threads on standby runs a task.
204         m_ExecuteEvent.Wait();
205         if (m_Finalizing)
206         {
207             break;
208         }
209         // Locks so that the execution task queue content cannot be read or written while obtaining a task.
210         m_ExecuteLock.Enter();
211         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212         {
213             if (!m_ExecuteQueue.IsEmpty())
214             {
215                 m_ExecuteLock.Leave();
216                 // If other tasks exist in the execution task queue, another worker thread is awakened and a task is started.
217                 m_ExecuteEvent.Signal();
218             }
219             else
220             {
221                 m_ExecuteLock.Leave();
222             }
223             // Executes a task.
224             task->Invoke();
225         }
226         else
227         {
228             m_ExecuteLock.Leave();
229         }
230     }
231 
232     // Wakes another worker thread, and executes the finalization process.
233     m_ExecuteEvent.Signal();
234 }
235 
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238     this_->ExecuteThreadFunc();
239 }
240 
241 // -----------------------------------------------------------------------
242 
243 
244 
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247     this->m_ExecuteEvent.Initialize(false);
248     this->m_ExecuteLock.Initialize();
249     StackBufferAdapter stack(stackBottom);
250     m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252 
Finalize()253 void SingleThreadPool::Finalize()
254 {
255     // Stops the task standby thread and worker thread.
256     if(!m_Finalizing)
257     {
258         this->m_Finalizing = true;
259         m_ExecuteEvent.Signal();
260         m_WorkerThread.Join();
261         m_WorkerThread.Finalize();
262     }
263 }
264 
265 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268     while (!m_Finalizing)
269     {
270         // If notification of task execution is received, one of the worker threads on standby runs a task.
271         m_ExecuteEvent.Wait();
272         if (m_Finalizing)
273         {
274             break;
275         }
276         // Locks so that the execution task queue content cannot be read or written while obtaining a task.
277         m_ExecuteLock.Enter();
278         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279         {
280             if (!m_ExecuteQueue.IsEmpty())
281             {
282                 m_ExecuteLock.Leave();
283                 // If another task exists in the execution task queue, an event notification is made so that the next execution can be done immediately
284                 m_ExecuteEvent.Signal();
285             }
286             else
287             {
288                 m_ExecuteLock.Leave();
289             }
290             // Executes a task.
291             task->Invoke();
292         }
293         else
294         {
295             m_ExecuteLock.Leave();
296         }
297     }
298 }
299 
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302     this_->ExecuteThreadFunc();
303 }
304 
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307     {
308         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309 
310         m_ExecuteQueue.Enqueue(task);
311     }
312     // Wakes up the worker thread to run tasks.
313     m_ExecuteEvent.Signal();
314 }
315 
316 }} // namespace nn::os
317 
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321 
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324     new (p) ThreadPoolTaskForC(f, param);
325 }
326 
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329     ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330     pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332 
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335     new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337 
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340     ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341     pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343 
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346     ThreadPool* pThreadPool = new (p) ThreadPool();
347     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349 
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353     ThreadPool* pThreadPool = new (p) ThreadPool();
354     StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif  // if NN_PLATFORM_HAS_MMU
358 
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362     pThreadPool->Finalize();
363 }
364 
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368     ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369     pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371 
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375     ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376     pThreadPool->AddTask(pThreadPoolTask);
377 }
378 
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381     return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383 
384