/*---------------------------------------------------------------------------* Project: Horizon File: os_ThreadPool.cpp Copyright (C)2009 Nintendo Co., Ltd. All rights reserved. These coded instructions, statements, and computer programs contain proprietary information of Nintendo of America Inc. and/or Nintendo Company Ltd., and are protected by Federal copyright law. They may not be disclosed to third parties or copied or duplicated in any form, in whole or in part, without the prior written consent of Nintendo. $Rev: 29304 $ *---------------------------------------------------------------------------*/ #include #include #include namespace nn { namespace os { inline Thread* ThreadPool::GetThreads() const { return reinterpret_cast(m_Buffer); } inline nn::Handle* ThreadPool::GetWaitHandleBuffer() const { return reinterpret_cast(m_Buffer + sizeof(Thread) * m_NumThreads); } inline QueueableWaitTask** ThreadPool::GetWaitTaskBuffer() const { return reinterpret_cast(m_Buffer + sizeof(Thread) * m_NumThreads + sizeof(nn::Handle) * (m_NumMaxWaitObjects + 1)); } size_t ThreadPool::GetWorkBufferSize(size_t numMaxWaitObjects, size_t numThreads) { return sizeof(Thread) * numThreads + sizeof(nn::Handle) * (numMaxWaitObjects + 1) + sizeof(QueueableWaitTask*) * numMaxWaitObjects; } inline void ThreadPool::InitializeCommon(size_t numMaxWaitObjects, size_t numThreads, void* workBuffer) { this->m_Finalizing = false; this->m_WaitingCount = 0; this->m_NumMaxWaitObjects = numMaxWaitObjects; this->m_NumThreads = numThreads; this->m_Buffer = reinterpret_cast(workBuffer); this->m_WaitEvent.Initialize(false); this->m_ExecuteEvent.Initialize(false); this->m_WaitLock.Initialize(); this->m_ExecuteLock.Initialize(); } namespace { struct StackBufferAdapter { uptr stackBottom; StackBufferAdapter(uptr stackBottom) : stackBottom(stackBottom) {} uptr GetStackBottom() const { return stackBottom; } }; } // 待機タスクを管理するスレッドを開始します。 inline void ThreadPool::StartWaitThread() { GetWaitHandleBuffer()[0] = this->m_WaitEvent.GetHandle(); m_WaitThread.Start(ThreadPool::WaitThreadFunc, this, m_WaitThreadStack, 0); } // タスクを実行するワーカースレッドを開始します。 inline void ThreadPool::StartExecuteThread(size_t i, uptr stackBottom, s32 priority) { Thread* thread = new (GetThreads() + i) Thread(); StackBufferAdapter stack(stackBottom); thread->Start(ThreadPool::ExecuteThreadFunc, this, stack, priority); } void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, uptr stackBottoms[], s32 priority) { // TODO: workBuffer の NULL チェックを推奨します。 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer); for (size_t i = 0; i < numThreads; i++) { StartExecuteThread(i, stackBottoms[i], priority); } StartWaitThread(); } #if NN_PLATFORM_HAS_MMU void ThreadPool::Initialize(void* workBuffer, size_t numMaxWaitObjects, size_t numThreads, nn::os::StackMemoryBlock stacks[], s32 priority) { // TODO: workBuffer の NULL チェックを推奨します。 InitializeCommon(numMaxWaitObjects, numThreads, workBuffer); for (size_t i = 0; i < numThreads; i++) { StartExecuteThread(i, stacks[i].GetStackBottom(), priority); } StartWaitThread(); } #endif // if NN_PLATFORM_HAS_MMU void ThreadPool::Finalize() { // タスク待機スレッド、ワーカースレッドを停止します。 this->m_Finalizing = true; m_WaitEvent.Signal(); m_ExecuteEvent.Signal(); for (size_t i = 0; i < m_NumThreads; i++) { Thread& thread = GetThreads()[i]; thread.Join(); thread.Finalize(); } m_WaitThread.Join(); m_WaitThread.Finalize(); } void ThreadPool::AddToExecuteQueue(QueueableTask* task) { { nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock); m_ExecuteQueue.Enqueue(task); } // タスクを実行するワーカースレッドを起床します。 m_ExecuteEvent.Signal(); } void ThreadPool::AddToWaitQueue(QueueableWaitTask* task) { { nn::os::CriticalSection::ScopedLock locker(m_WaitLock); m_WaitQueue.Enqueue(task->AsNonWaitableTask()); } // 同期待ちスレッドを起床します。 m_WaitEvent.Signal(); } // 同期待ちスレッドでのループ処理です。 inline void ThreadPool::WaitThreadFunc() { // この関数のスタックサイズ(WAIT_THREAD_STACK_SIZE)は固定で小さめに取っている。 // コードを修正する際に注意すること。 // 必要であれば、スタックサイズを修正する。 while (!m_Finalizing) { { nn::os::CriticalSection::ScopedLock locker(m_WaitLock); // 新しく同期タスクキューに追加されたタスクを同期待ちハンドルバッファに追加します。 for (; m_WaitingCount < m_NumMaxWaitObjects && !m_WaitQueue.IsEmpty(); ++m_WaitingCount) { QueueableWaitTask* task = GetWaitTaskPointer(m_WaitQueue.Dequeue()); GetWaitTaskBuffer()[m_WaitingCount] = task; // 同期待ちハンドルバッファのインデックス 0 はタスクが追加されたことを // 通知するために利用するのでインデックスには 1 以上の値を指定します。 GetWaitHandleBuffer()[m_WaitingCount + 1] = task->GetWaitObject()->GetHandle(); } } // 同期待ち中のハンドルのいずれかがシグナル状態になるまで待ちます。 s32 n; NN_ERR_THROW_FATAL(nn::svc::WaitSynchronization(&n, GetWaitHandleBuffer(), m_WaitingCount + 1, false, WAIT_INFINITE)); // キューに同期タスクが追加されたとき (n == 0) は処理は行いません。 if (n >= 1) { // 同期待ちが完了したタスクをタスク実行バッファに移します。 QueueableTask* task; { nn::os::CriticalSection::ScopedLock locker(m_WaitLock); task = GetWaitTaskBuffer()[n - 1]->AsNonWaitableTask(); GetWaitTaskBuffer()[n - 1] = GetWaitTaskBuffer()[m_WaitingCount - 1]; GetWaitHandleBuffer()[n] = GetWaitHandleBuffer()[m_WaitingCount]; --m_WaitingCount; } AddToExecuteQueue(task); } } } void ThreadPool::WaitThreadFunc(ThreadPool* this_) { this_->WaitThreadFunc(); } // タスク実行ワーカースレッドのループ処理です。 inline void ThreadPool::ExecuteThreadFunc() { while (!m_Finalizing) { // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。 m_ExecuteEvent.Wait(); if (m_Finalizing) { break; } // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。 m_ExecuteLock.Enter(); if (QueueableTask* task = m_ExecuteQueue.Dequeue()) { if (!m_ExecuteQueue.IsEmpty()) { m_ExecuteLock.Leave(); // 実行タスクキューに他のタスクが存在すれば、他のワーカースレッドを起床してタスクを実行させます。 m_ExecuteEvent.Signal(); } else { m_ExecuteLock.Leave(); } // タスクを実行します。 task->Invoke(); } else { m_ExecuteLock.Leave(); } } // 他のワーカースレッドを起床し、終了処理を実行させます。 m_ExecuteEvent.Signal(); } void ThreadPool::ExecuteThreadFunc(ThreadPool* this_) { this_->ExecuteThreadFunc(); } // ----------------------------------------------------------------------- void SingleThreadPool::Initialize(uptr stackBottom, s32 priority) { this->m_ExecuteEvent.Initialize(false); this->m_ExecuteLock.Initialize(); StackBufferAdapter stack(stackBottom); m_WorkerThread.Start(SingleThreadPool::ExecuteThreadFunc, this, stack, priority); } void SingleThreadPool::Finalize() { // タスク待機スレッド、ワーカースレッドを停止します。 if(!m_Finalizing) { this->m_Finalizing = true; m_ExecuteEvent.Signal(); m_WorkerThread.Join(); m_WorkerThread.Finalize(); } } // タスク実行ワーカースレッドのループ処理です。 inline void SingleThreadPool::ExecuteThreadFunc() { while (!m_Finalizing) { // タスク実行通知がくれば、待機中のワーカースレッドのいずれかがタスクを実行します。 m_ExecuteEvent.Wait(); if (m_Finalizing) { break; } // タスク取得中に実行タスクキューの内容を読み書きされないようにロックします。 m_ExecuteLock.Enter(); if (QueueableTask* task = m_ExecuteQueue.Dequeue()) { if (!m_ExecuteQueue.IsEmpty()) { m_ExecuteLock.Leave(); // 実行タスクキューに他のタスクが存在すれば、即座に次を実行できるようにイベントを通知しておく m_ExecuteEvent.Signal(); } else { m_ExecuteLock.Leave(); } // タスクを実行します。 task->Invoke(); } else { m_ExecuteLock.Leave(); } } } void SingleThreadPool::ExecuteThreadFunc(SingleThreadPool* this_) { this_->ExecuteThreadFunc(); } void SingleThreadPool::AddToExecuteQueue(QueueableTask* task) { { nn::os::CriticalSection::ScopedLock locker(m_ExecuteLock); m_ExecuteQueue.Enqueue(task); } // タスクを実行するワーカースレッドを起床します。 m_ExecuteEvent.Signal(); } }} // namespace nn::os #include using namespace nn::os; using namespace nn::os::detail; NN_EXTERN_C void nnosThreadPoolTaskInitialize(nnosThreadPoolTask* p, void (*f)(uptr), uptr param) { new (p) ThreadPoolTaskForC(f, param); } void nnosThreadPoolTaskFinalize(nnosThreadPoolTask* p) { ThreadPoolTaskForC* pThreadPoolTaskForC = reinterpret_cast(p); pThreadPoolTaskForC->~ThreadPoolTaskForC(); } NN_EXTERN_C void nnosThreadPoolWaitTaskInitialize(nnosThreadPoolWaitTask* p, nnosWaitObject* waitObject, void (*f)(uptr), uptr param) { new (p) ThreadPoolWaitTaskForC(waitObject, f, param); } void nnosThreadPoolWaitTaskFinalize(nnosThreadPoolWaitTask* p) { ThreadPoolWaitTaskForC* pThreadPoolWaitTaskForC = reinterpret_cast(p); pThreadPoolWaitTaskForC->~ThreadPoolWaitTaskForC(); } void nnosThreadPoolInitialize(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, uptr workerStackBottoms[], s32 workerPriority) { ThreadPool* pThreadPool = new (p) ThreadPool(); pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, workerStackBottoms, workerPriority); } #if NN_PLATFORM_HAS_MMU void nnosThreadPoolInitializeWithStackMemoryBlock(nnosThreadPool* p, void* workBuffer, size_t numMaxWaitObjects, size_t numWorkerThreads, nnosStackMemoryBlock workerStacks[], s32 workerPriority) { ThreadPool* pThreadPool = new (p) ThreadPool(); StackMemoryBlock* pWorkerStacks = reinterpret_cast(workerStacks); pThreadPool->Initialize(workBuffer, numMaxWaitObjects, numWorkerThreads, pWorkerStacks, workerPriority); } #endif // if NN_PLATFORM_HAS_MMU void nnosThreadPoolFinalize(nnosThreadPool* p) { ThreadPool* pThreadPool = reinterpret_cast(p); pThreadPool->Finalize(); } void nnosThreadPoolAddWaitTask(nnosThreadPool* p, nnosThreadPoolWaitTask* task) { ThreadPool* pThreadPool = reinterpret_cast(p); ThreadPoolWaitTaskForC* pThreadPoolTask = reinterpret_cast(task); pThreadPool->AddWaitTask(pThreadPoolTask); } void nnosThreadPoolAddTask(nnosThreadPool* p, nnosThreadPoolTask* task) { ThreadPool* pThreadPool = reinterpret_cast(p); ThreadPoolTaskForC* pThreadPoolTask = reinterpret_cast(task); pThreadPool->AddTask(pThreadPoolTask); } size_t nnosThreadPoolGetWorkBufferSize(size_t numMaxWaitObjects, size_t numWorkerThreads) { return ThreadPool::GetWorkBufferSize(numMaxWaitObjects, numWorkerThreads); }