/*---------------------------------------------------------------------------* Project: Horizon File: os_BlockingQueue.cpp Copyright (C)2009 Nintendo Co., Ltd. All rights reserved. These coded instructions, statements, and computer programs contain proprietary information of Nintendo of America Inc. and/or Nintendo Company Ltd., and are protected by Federal copyright law. They may not be disclosed to third parties or copied or duplicated in any form, in whole or in part, without the prior written consent of Nintendo. $Rev: 25344 $ *---------------------------------------------------------------------------*/ #include #include #include #include #include #include #include #include //--------------------------------------------------------------------------- using namespace nn; using namespace nn::fnd; using namespace nn::svc; using namespace nn::os; using namespace nn::util; namespace nn{ namespace os{ namespace detail { template BlockingQueueBase::~BlockingQueueBase() { NN_TASSERT_(m_WaitingEnqueueCount == 0 && m_WaitingDequeueCount == 0); Finalize(); } template void BlockingQueueBase::Initialize(uptr buffer[], size_t size) { m_ppBuffer = buffer; m_size = size; m_firstIndex = 0; m_usedCount = 0; m_WaitingEnqueueCount = 0; m_WaitingDequeueCount = 0; m_EnqueueSemaphore.Initialize(0, INT_MAX); m_DequeueSemaphore.Initialize(0, INT_MAX); m_cs.Initialize(); } template nn::Result BlockingQueueBase::TryInitialize(uptr buffer[], size_t size) { m_ppBuffer = buffer; m_size = size; m_firstIndex = 0; m_usedCount = 0; m_WaitingEnqueueCount = 0; m_WaitingDequeueCount = 0; m_EnqueueSemaphore.Initialize(0, INT_MAX); m_DequeueSemaphore.Initialize(0, INT_MAX); NN_UTIL_RETURN_IF_FAILED(m_cs.TryInitialize()); return ResultSuccess(); } template void BlockingQueueBase::Finalize() { m_cs.Finalize(); m_DequeueSemaphore.Finalize(); m_EnqueueSemaphore.Finalize(); } template inline void BlockingQueueBase::NotifyEnqueue() const { if (m_WaitingEnqueueCount > 0) { m_EnqueueSemaphore.Release(); } } template inline void BlockingQueueBase::NotifyDequeue() const { if (m_WaitingDequeueCount > 0) { m_DequeueSemaphore.Release(); } } template bool BlockingQueueBase::TryEnqueue(uptr data) { // キューにデータ挿入中に他のスレッドから操作されないようにロックします。 ScopedLock locker(m_cs); if (m_size > m_usedCount) { s32 lastIndex = (m_firstIndex + m_usedCount) % m_size; m_ppBuffer[lastIndex] = data; m_usedCount++; // データ挿入待ちスレッドを起床します。 NotifyEnqueue(); return true; } else { return false; } } template bool BlockingQueueBase::ForceEnqueue(uptr data, uptr* pOut) { // キューにデータを挿入中に他のスレッドから操作されないようにロックします。 ScopedLock locker(m_cs); bool bReturn; s32 lastIndex = (m_firstIndex + m_usedCount) % m_size; if (m_size > m_usedCount) { m_usedCount++; bReturn = true; } else { if (pOut) { *pOut = m_ppBuffer[lastIndex]; } m_firstIndex = (m_firstIndex + 1) % m_size; bReturn = false; } m_ppBuffer[lastIndex] = data; // データ挿入待ちスレッドを起床します。 NotifyEnqueue(); return bReturn; } template void BlockingQueueBase::Enqueue(uptr data) { // キューに挿入処理中のスレッド数を更新します。 ++m_WaitingDequeueCount; for(;;) { if (TryEnqueue(data)) { break; } // キューに空きができるまで待機します。 m_DequeueSemaphore.Acquire(); } --m_WaitingDequeueCount; } template bool BlockingQueueBase::TryJam(uptr data) { // キューにデータを挿入中に他のスレッドから操作されないようにロックします。 ScopedLock locker(m_cs); if (m_size > m_usedCount) { m_firstIndex = (m_firstIndex + m_size - 1) % m_size; m_ppBuffer[m_firstIndex] = data; m_usedCount++; // データ挿入待ちスレッドを起床します。 NotifyEnqueue(); return true; } else { return false; } } template void BlockingQueueBase::Jam(uptr data) { // キューに挿入処理中のスレッド数を更新します。 ++m_WaitingDequeueCount; for(;;) { if (TryJam(data)) { break; } // キューに空きができるまで待機します。 m_DequeueSemaphore.Acquire(); } --m_WaitingDequeueCount; } template bool BlockingQueueBase::TryDequeue(uptr* pOut) { // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。 ScopedLock locker(m_cs); if (0 < m_usedCount) { *pOut = m_ppBuffer[m_firstIndex]; m_firstIndex = (m_firstIndex + 1) % m_size; m_usedCount--; // キュー空き待ちスレッドを起床します。 NotifyDequeue(); return true; } else { return false; } } template uptr BlockingQueueBase::Dequeue() { // キューから取り出し処理中のスレッド数を更新します。 ++m_WaitingEnqueueCount; uptr data; for(;;) { if (TryDequeue(&data)) { break; } // キューの内容が空の場合は待機します。 m_EnqueueSemaphore.Acquire(); } --m_WaitingEnqueueCount; return data; } template bool BlockingQueueBase::TryGetFront(uptr* pOut) const { // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。 ScopedLock locker(m_cs); if (0 < m_usedCount) { *pOut = m_ppBuffer[m_firstIndex]; return true; } else { return false; } } template uptr BlockingQueueBase::GetFront() const { // キューから取り出し処理中のスレッド数を更新します。 ++m_WaitingEnqueueCount; uptr data; for(;;) { if (TryGetFront(&data)) { break; } // キューの内容が空の場合は待機します。 m_EnqueueSemaphore.Acquire(); } --m_WaitingEnqueueCount; return data; } template class BlockingQueueBase; template class BlockingQueueBase; } // namespace detail }} // namespace nn::os #include using namespace nn::os; extern "C" { void nnosBlockingQueueInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); new (pBlockingQueue) BlockingQueue(); pBlockingQueue->Initialize(buffer, size); } bool nnosBlockingQueueTryInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); new (pBlockingQueue) BlockingQueue(); Result result = pBlockingQueue->TryInitialize(buffer, size); return result.IsSuccess(); } void nnosBlockingQueueFinalize(nnosBlockingQueue* this_) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Finalize(); pBlockingQueue->~BlockingQueue(); } bool nnosBlockingQueueTryEnqueue(nnosBlockingQueue* this_, uptr data) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryEnqueue(data); } void nnosBlockingQueueEnqueue(nnosBlockingQueue* this_, uptr data) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Enqueue(data); } bool nnosBlockingQueueTryJam(nnosBlockingQueue* this_, uptr data) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryJam(data); } void nnosBlockingQueueJam(nnosBlockingQueue* this_, uptr data) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Jam(data); } bool nnosBlockingQueueTryDequeue(nnosBlockingQueue* this_, uptr* pOut) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryDequeue(pOut); } uptr nnosBlockingQueueDequeue(nnosBlockingQueue* this_) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->Dequeue(); } bool nnosBlockingQueueTryGetFront(nnosBlockingQueue* this_, uptr* pOut) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryGetFront(pOut); } uptr nnosBlockingQueueGetFront(nnosBlockingQueue* this_) { BlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->GetFront(); } void nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); new (pBlockingQueue) SafeBlockingQueue(); pBlockingQueue->Initialize(buffer, size); } bool nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); new (pBlockingQueue) SafeBlockingQueue(); Result result = pBlockingQueue->TryInitialize(buffer, size); return result.IsSuccess(); } void nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue* this_) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Finalize(); pBlockingQueue->~SafeBlockingQueue(); } bool nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue* this_, uptr data) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryEnqueue(data); } void nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue* this_, uptr data) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Enqueue(data); } bool nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue* this_, uptr data) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryJam(data); } void nnosSafeBlockingQueueJam(nnosSafeBlockingQueue* this_, uptr data) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); pBlockingQueue->Jam(data); } bool nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue* this_, uptr* pOut) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryDequeue(pOut); } uptr nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue* this_) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->Dequeue(); } bool nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue* this_, uptr* pOut) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->TryGetFront(pOut); } uptr nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue* this_) { SafeBlockingQueue* pBlockingQueue = reinterpret_cast(this_); return pBlockingQueue->GetFront(); } }