1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_ThreadPool.cpp
4   Copyright (C)2009 Nintendo Co., Ltd.  All rights reserved.
5   These coded instructions, statements, and computer programs contain
6   proprietary information of Nintendo of America Inc. and/or Nintendo
7   Company Ltd., and are protected by Federal copyright law. They may
8   not be disclosed to third parties or copied or duplicated in any form,
9   in whole or in part, without the prior written consent of Nintendo.
10   $Rev: 34337 $
11  *---------------------------------------------------------------------------
12 
13 
14 */
15 
16 #include <nn/os/os_ThreadPool.h>
17 #include <nn/err.h>
18 #include <new>
19 
20 namespace nn { namespace os {
21 
GetThreads() const22 inline Thread* ThreadPool::GetThreads() const
23 {
24     return reinterpret_cast<Thread*>(m_Buffer);
25 }
26 
GetWaitHandleBuffer() const27 inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const
28 {
29     return reinterpret_cast<nn::Handle*>(m_Buffer + sizeof(Thread) * m_NumThreads);
30 }
31 
GetWaitTaskBuffer() const32 inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const
33 {
34     return reinterpret_cast<QueueableWaitTask**>(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1));
35 }
36 
GetWorkBufferSize(size_t numMaxWaitObjects,size_t numThreads)37 size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads)
38 {
39     return sizeof(Thread) * numThreads
40          + sizeof(nn::Handle) * (numMaxWaitObjects + 1)
41          + sizeof(QueueableWaitTask*) * numMaxWaitObjects;
42 }
43 
InitializeCommon(size_t numMaxWaitObjects,size_t numThreads,void * workBuffer)44 inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer)
45 {
46     this->m_Finalizing = false;
47     this->m_WaitingCount = 0;
48     this->m_NumMaxWaitObjects = numMaxWaitObjects;
49     this->m_NumThreads = numThreads;
50     this->m_Buffer = reinterpret_cast<uptr>(workBuffer);
51     this->m_WaitEvent.Initialize(false);
52     this->m_ExecuteEvent.Initialize(false);
53     this->m_WaitLock.Initialize();
54     this->m_ExecuteLock.Initialize();
55 }
56 
57 namespace {
58     struct StackBufferAdapter
59     {
60         uptr stackBottom;
StackBufferAdapternn::os::__anon6efd3d550111::StackBufferAdapter61         StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {}
GetStackBottomnn::os::__anon6efd3d550111::StackBufferAdapter62         uptr GetStackBottom() const { return stackBottom; }
63     };
64 }
65 
66 // Starts the thread to manage tasks on standby.
StartWaitThread(s32 priority)67 inline void ThreadPool::StartWaitThread(s32 priority)
68 {
69     GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle();
70     m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, priority);
71 }
72 
73 // Starts worker thread to execute tasks.
StartExecuteThread(size_t i,uptr stackBottom,s32 priority)74 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority)
75 {
76     Thread* thread = new (GetThreads() + i) Thread();
77     StackBufferAdapter stack(stackBottom);
78     thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority);
79 }
80 
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,uptr stackBottoms[],s32 workerPriority,s32 waitPriority)81 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 workerPriority, s32 waitPriority)
82 {
83     // TODO: Recommend NULL check of workBuffer.
84 
85     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
86     for (size_t i = 0; i < numThreads; i++)
87     {
88         StartExecuteThread(i, stackBottoms[i], workerPriority);
89     }
90     StartWaitThread(waitPriority);
91 }
92 
93 #if NN_PLATFORM_HAS_MMU
Initialize(void * workBuffer,size_t numMaxWaitObjects,size_t numThreads,nn::os::StackMemoryBlock stacks[],s32 workerPriority,s32 waitPriority)94 void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 workerPriority, s32 waitPriority)
95 {
96     // TODO: Recommend NULL check of workBuffer.
97 
98     InitializeCommon(numMaxWaitObjects, numThreads, workBuffer);
99     for (size_t i = 0; i < numThreads; i++)
100     {
101         StartExecuteThread(i, stacks[i].GetStackBottom(), workerPriority);
102     }
103     StartWaitThread(waitPriority);
104 }
105 #endif  // if NN_PLATFORM_HAS_MMU
106 
Finalize()107 void ThreadPool::Finalize()
108 {
109     // Stops the task standby thread and worker thread.
110     this->m_Finalizing = true;
111 
112     m_WaitEvent.Signal();
113     m_ExecuteEvent.Signal();
114 
115     for (size_t i = 0; i < m_NumThreads; i++)
116     {
117         Thread& thread = GetThreads()[i];
118         thread.Join();
119         thread.Finalize();
120     }
121 
122     m_WaitThread.Join();
123     m_WaitThread.Finalize();
124 }
125 
AddToExecuteQueue(QueueableTask * task)126 void ThreadPool::AddToExecuteQueue(QueueableTask* task)
127 {
128     {
129         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
130 
131         m_ExecuteQueue.Enqueue(task);
132     }
133 
134     // Wakes up the worker thread to execute tasks.
135     m_ExecuteEvent.Signal();
136 }
137 
AddToWaitQueue(QueueableWaitTask * task)138 void ThreadPool::AddToWaitQueue(QueueableWaitTask* task)
139 {
140     {
141         nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
142 
143         m_WaitQueue.Enqueue(task->AsNonWaitableTask());
144     }
145 
146     // Wakes up synchronization wait thread.
147     m_WaitEvent.Signal();
148 }
149 
150 // Loop processing for the synchronization wait thread.
WaitThreadFunc()151 inline void ThreadPool::WaitThreadFunc()
152 {
153     // The stack size of this function (WAIT_THREAD_STACK_SIZE) is fixed and small.
154     // Take note when revising the code.
155     // Revise the stack size if necessary.
156     while (!m_Finalizing)
157     {
158         {
159             nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
160 
161             // Adds new tasks added to the synchronization task queue to the synchronization wait handle buffer.
162             for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount)
163             {
164                 QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue());
165                 GetWaitTaskBuffer()[m_WaitingCount] = task;
166                 // Specifies a value of 1 or greater in the index because a 0 value for the synchronization wait handle buffer index is used to notify that a task has been added.
167                 //
168                 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle();
169             }
170         }
171 
172         // Waits until one of the handles waiting for synchronization enters a signal state.
173         s32 n;
174         NN_ERR_THROW_FATAL(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE));
175         // Does not process when a synchronization task is added to the queue (n == 0).
176         if (n >= 1)
177         {
178             // Move tasks that have completed synchronization wait to the task execution buffer.
179             QueueableTask* task;
180             {
181                 nn::os::CriticalSection::ScopedLock locker(m_WaitLock);
182 
183                 task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask();
184                 GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1];
185                 GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount];
186                 --m_WaitingCount;
187             }
188             AddToExecuteQueue(task);
189         }
190     }
191 }
192 
WaitThreadFunc(ThreadPool * this_)193 void ThreadPool::WaitThreadFunc(ThreadPool* this_)
194 {
195     this_->WaitThreadFunc();
196 }
197 
198 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()199 inline void ThreadPool::ExecuteThreadFunc()
200 {
201     while (!m_Finalizing)
202     {
203         // If notification of task execution is received, one of the worker threads on standby executes a task.
204         m_ExecuteEvent.Wait();
205         if (m_Finalizing)
206         {
207             break;
208         }
209         // Locks so that the execution task queue content cannot be read or written while obtaining a task.
210         m_ExecuteLock.Enter();
211         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
212         {
213             if (!m_ExecuteQueue.IsEmpty())
214             {
215                 m_ExecuteLock.Leave();
216                 // If other tasks exist in the execution task queue, another worker thread is awakened and a task is executed.
217                 m_ExecuteEvent.Signal();
218             }
219             else
220             {
221                 m_ExecuteLock.Leave();
222             }
223             // Executes a task.
224             task->Invoke();
225         }
226         else
227         {
228             m_ExecuteLock.Leave();
229         }
230     }
231 
232     // Wakes another worker thread, and executes the finalization process.
233     m_ExecuteEvent.Signal();
234 }
235 
ExecuteThreadFunc(ThreadPool * this_)236 void ThreadPool::ExecuteThreadFunc(ThreadPool* this_)
237 {
238     this_->ExecuteThreadFunc();
239 }
240 
241 // -----------------------------------------------------------------------
242 
243 
244 
Initialize(uptr stackBottom,s32 priority)245 void SingleThreadPool::Initialize(uptr stackBottom, s32 priority)
246 {
247     this->m_ExecuteEvent.Initialize(false);
248     this->m_ExecuteLock.Initialize();
249     StackBufferAdapter stack(stackBottom);
250     m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority);
251 }
252 
Finalize()253 void SingleThreadPool::Finalize()
254 {
255     // Stops the task standby thread and worker thread.
256     if(!m_Finalizing)
257     {
258         this->m_Finalizing = true;
259         m_ExecuteEvent.Signal();
260         m_WorkerThread.Join();
261         m_WorkerThread.Finalize();
262     }
263 }
264 
265 // Loop processing for the task execution worker thread.
ExecuteThreadFunc()266 inline void SingleThreadPool::ExecuteThreadFunc()
267 {
268     while (!m_Finalizing)
269     {
270         // If notification of task execution is received, one of the worker threads on standby executes a task.
271         m_ExecuteEvent.Wait();
272         if (m_Finalizing)
273         {
274             break;
275         }
276         // Locks so that the execution task queue content cannot be read or written while obtaining a task.
277         m_ExecuteLock.Enter();
278         if (QueueableTask* task = m_ExecuteQueue.Dequeue())
279         {
280             if (!m_ExecuteQueue.IsEmpty())
281             {
282                 m_ExecuteLock.Leave();
283                 // If another task exists in the execution task queue, an event notification is made so that the next execution can be done immediately
284                 m_ExecuteEvent.Signal();
285             }
286             else
287             {
288                 m_ExecuteLock.Leave();
289             }
290             // Executes a task.
291             task->Invoke();
292         }
293         else
294         {
295             m_ExecuteLock.Leave();
296         }
297     }
298 }
299 
ExecuteThreadFunc(SingleThreadPool * this_)300 void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_)
301 {
302     this_->ExecuteThreadFunc();
303 }
304 
AddToExecuteQueue(QueueableTask * task)305 void SingleThreadPool::AddToExecuteQueue(QueueableTask* task)
306 {
307     {
308         nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock);
309 
310         m_ExecuteQueue.Enqueue(task);
311     }
312     // Wakes up the worker thread to execute tasks.
313     m_ExecuteEvent.Signal();
314 }
315 
316 }} // namespace nn::os
317 
318 #include <new>
319 using namespace nn::os;
320 using namespace nn::os::detail;
321 
nnosThreadPoolTaskInitialize(nnosThreadPoolTask * p,void (* f)(uptr),uptr param)322 NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param)
323 {
324     new (p) ThreadPoolTaskForC(f, param);
325 }
326 
nnosThreadPoolTaskFinalize(nnosThreadPoolTask * p)327 void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p)
328 {
329     ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast<ThreadPoolTaskForC*>(p);
330     pThreadPoolTaskForC->~ThreadPoolTaskForC();
331 }
332 
nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask * p,nnosWaitObject * waitObject,void (* f)(uptr),uptr param)333 NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param)
334 {
335     new (p) ThreadPoolWaitTaskForC(waitObject, f, param);
336 }
337 
nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask * p)338 void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p)
339 {
340     ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast<ThreadPoolWaitTaskForC*>(p);
341     pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC();
342 }
343 
nnosThreadPoolInitialize(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,uptr workerStackBottoms[],s32 workerPriority)344 void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority)
345 {
346     ThreadPool* pThreadPool = new (p) ThreadPool();
347     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority);
348 }
349 
350 #if NN_PLATFORM_HAS_MMU
nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool * p,void * workBuffer,size_t numMaxWaitObjects,size_t numWorkerThreads,nnosStackMemoryBlock workerStacks[],s32 workerPriority)351 void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority)
352 {
353     ThreadPool* pThreadPool = new (p) ThreadPool();
354     StackMemoryBlock* pWorkerStacks = reinterpret_cast<StackMemoryBlock*>(workerStacks);
355     pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority);
356 }
357 #endif  // if NN_PLATFORM_HAS_MMU
358 
nnosThreadPoolFinalize(nnosThreadPool * p)359 void nnosThreadPoolFinalize(nnosThreadPool* p)
360 {
361     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
362     pThreadPool->Finalize();
363 }
364 
nnosThreadPoolAddWaitTask(nnosThreadPool * p,nnosThreadPoolWaitTask * task)365 void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task)
366 {
367     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
368     ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolWaitTaskForC*>(task);
369     pThreadPool->AddWaitTask(pThreadPoolTask);
370 }
371 
nnosThreadPoolAddTask(nnosThreadPool * p,nnosThreadPoolTask * task)372 void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task)
373 {
374     ThreadPool* pThreadPool = reinterpret_cast<ThreadPool*>(p);
375     ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast<ThreadPoolTaskForC*>(task);
376     pThreadPool->AddTask(pThreadPoolTask);
377 }
378 
nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects,size_t numWorkerThreads)379 size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads)
380 {
381     return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads);
382 }
383 
384