1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_BlockingQueue.cpp
4 
5   Copyright (C)2009 Nintendo Co., Ltd.  All rights reserved.
6 
7   These coded instructions, statements, and computer programs contain
8   proprietary information of Nintendo of America Inc. and/or Nintendo
9   Company Ltd., and are protected by Federal copyright law.  They may
10   not be disclosed to third parties or copied or duplicated in any form,
11   in whole or in part, without the prior written consent of Nintendo.
12 
13   $Rev: 28736 $
14  *---------------------------------------------------------------------------*/
15 
16 #include <nn/assert.h>
17 #include <nn/config.h>
18 #include <nn/os/os_BlockingQueue.h>
19 #include <nn/os/os_Mutex.h>
20 #include <nn/os/os_CriticalSection.h>
21 #include <nn/os/os_InterCoreCriticalSection.h>
22 #include <nn/fnd/fnd_Interlocked.h>
23 #include <nn/util/detail/util_InitializationTransaction.h>
24 
25 #include <climits>
26 
27 //---------------------------------------------------------------------------
28 
29 using namespace nn;
30 using namespace nn::fnd;
31 using namespace nn::svc;
32 using namespace nn::os;
33 using namespace nn::util;
34 
35 namespace nn{ namespace os{
36 
37 namespace detail {
38 
39 template <class Locker>
~BlockingQueueBase()40 BlockingQueueBase<Locker>::~BlockingQueueBase()
41 {
42     NN_TASSERT_(m_WaitingEnqueueCount == 0 && m_WaitingDequeueCount == 0);
43     Finalize();
44 }
45 
46 template <class Locker>
Initialize(uptr buffer[],size_t size)47 void BlockingQueueBase<Locker>::Initialize(uptr buffer[], size_t size)
48 {
49     m_ppBuffer      = buffer;
50     m_size          = size;
51     m_firstIndex    = 0;
52     m_usedCount     = 0;
53     m_WaitingEnqueueCount = 0;
54     m_WaitingDequeueCount = 0;
55     m_EnqueueSemaphore.Initialize(0, INT_MAX);
56     m_DequeueSemaphore.Initialize(0, INT_MAX);
57     m_cs.Initialize();
58 }
59 
60 template <class Locker>
TryInitialize(uptr buffer[],size_t size)61 nn::Result BlockingQueueBase<Locker>::TryInitialize(uptr buffer[], size_t size)
62 {
63     m_ppBuffer      = buffer;
64     m_size          = size;
65     m_firstIndex    = 0;
66     m_usedCount     = 0;
67     m_WaitingEnqueueCount = 0;
68     m_WaitingDequeueCount = 0;
69 
70     m_EnqueueSemaphore.Initialize(0, INT_MAX);
71     m_DequeueSemaphore.Initialize(0, INT_MAX);
72 
73     NN_UTIL_RETURN_IF_FAILED(m_cs.TryInitialize());
74     return ResultSuccess();
75 }
76 
77 template <class Locker>
Finalize()78 void BlockingQueueBase<Locker>::Finalize()
79 {
80     m_cs.Finalize();
81     m_DequeueSemaphore.Finalize();
82     m_EnqueueSemaphore.Finalize();
83 }
84 
85 
86 template <class Locker>
NotifyEnqueue() const87 inline void BlockingQueueBase<Locker>::NotifyEnqueue() const
88 {
89     if (m_WaitingEnqueueCount > 0)
90     {
91         m_EnqueueSemaphore.Release();
92     }
93 }
94 
95 template <class Locker>
NotifyDequeue() const96 inline void BlockingQueueBase<Locker>::NotifyDequeue() const
97 {
98     if (m_WaitingDequeueCount > 0)
99     {
100         m_DequeueSemaphore.Release();
101     }
102 }
103 
104 template <class Locker>
TryEnqueue(uptr data)105 bool BlockingQueueBase<Locker>::TryEnqueue(uptr data)
106 {
107     // キューにデータ挿入中に他のスレッドから操作されないようにロックします。
108     ScopedLock locker(m_cs);
109 
110     if (m_size > m_usedCount)
111     {
112         s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
113         m_ppBuffer[lastIndex] = data;
114         m_usedCount++;
115 
116         // データ挿入待ちスレッドを起床します。
117         NotifyEnqueue();
118         return true;
119     }
120     else
121     {
122         return false;
123     }
124 }
125 
126 template <class Locker>
ForceEnqueue(uptr data,uptr * pOut)127 bool BlockingQueueBase<Locker>::ForceEnqueue(uptr data, uptr* pOut)
128 {
129     // キューにデータを挿入中に他のスレッドから操作されないようにロックします。
130     ScopedLock locker(m_cs);
131     bool bReturn;
132     s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
133     if (m_size > m_usedCount)
134     {
135         m_usedCount++;
136         bReturn = true;
137     }
138     else
139     {
140         if (pOut)
141         {
142             *pOut = m_ppBuffer[lastIndex];
143         }
144         m_firstIndex = (m_firstIndex + 1) % m_size;
145         bReturn = false;
146     }
147 
148     m_ppBuffer[lastIndex] = data;
149     // データ挿入待ちスレッドを起床します。
150     NotifyEnqueue();
151     return bReturn;
152 }
153 
154 template <class Locker>
Enqueue(uptr data)155 void BlockingQueueBase<Locker>::Enqueue(uptr data)
156 {
157     // キューに挿入処理中のスレッド数を更新します。
158     ++m_WaitingDequeueCount;
159     for(;;)
160     {
161         if (TryEnqueue(data))
162         {
163             break;
164         }
165 
166         // キューに空きができるまで待機します。
167         m_DequeueSemaphore.Acquire();
168     }
169     --m_WaitingDequeueCount;
170 }
171 
172 template <class Locker>
TryJam(uptr data)173 bool BlockingQueueBase<Locker>::TryJam(uptr data)
174 {
175     // キューにデータを挿入中に他のスレッドから操作されないようにロックします。
176     ScopedLock locker(m_cs);
177 
178     if (m_size > m_usedCount)
179     {
180         m_firstIndex = (m_firstIndex + m_size - 1) % m_size;
181         m_ppBuffer[m_firstIndex] = data;
182         m_usedCount++;
183 
184         // データ挿入待ちスレッドを起床します。
185         NotifyEnqueue();
186         return true;
187     }
188     else
189     {
190         return false;
191     }
192 }
193 
194 template <class Locker>
Jam(uptr data)195 void BlockingQueueBase<Locker>::Jam(uptr data)
196 {
197     // キューに挿入処理中のスレッド数を更新します。
198     ++m_WaitingDequeueCount;
199     for(;;)
200     {
201         if (TryJam(data))
202         {
203             break;
204         }
205 
206         // キューに空きができるまで待機します。
207         m_DequeueSemaphore.Acquire();
208     }
209     --m_WaitingDequeueCount;
210 }
211 
212 template <class Locker>
TryDequeue(uptr * pOut)213 bool BlockingQueueBase<Locker>::TryDequeue(uptr* pOut)
214 {
215     // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。
216     ScopedLock locker(m_cs);
217 
218     if (0 < m_usedCount)
219     {
220         *pOut = m_ppBuffer[m_firstIndex];
221         m_firstIndex = (m_firstIndex + 1) % m_size;
222         m_usedCount--;
223 
224         // キュー空き待ちスレッドを起床します。
225         NotifyDequeue();
226         return true;
227     }
228     else
229     {
230         return false;
231     }
232 }
233 
234 template <class Locker>
Dequeue()235 uptr BlockingQueueBase<Locker>::Dequeue()
236 {
237     // キューから取り出し処理中のスレッド数を更新します。
238     ++m_WaitingEnqueueCount;
239     uptr data;
240     for(;;)
241     {
242         if (TryDequeue(&data))
243         {
244             break;
245         }
246 
247         // キューの内容が空の場合は待機します。
248         m_EnqueueSemaphore.Acquire();
249     }
250     --m_WaitingEnqueueCount;
251     return data;
252 }
253 
254 template <class Locker>
TryGetFront(uptr * pOut) const255 bool BlockingQueueBase<Locker>::TryGetFront(uptr* pOut) const
256 {
257     // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。
258     ScopedLock locker(m_cs);
259 
260     if (0 < m_usedCount)
261     {
262         *pOut = m_ppBuffer[m_firstIndex];
263 
264         return true;
265     }
266     else
267     {
268         return false;
269     }
270 }
271 
272 template <class Locker>
GetFront() const273 uptr BlockingQueueBase<Locker>::GetFront() const
274 {
275     // キューから取り出し処理中のスレッド数を更新します。
276     ++m_WaitingEnqueueCount;
277     uptr data;
278     for(;;)
279     {
280         if (TryGetFront(&data))
281         {
282             break;
283         }
284 
285         // キューの内容が空の場合は待機します。
286         m_EnqueueSemaphore.Acquire();
287     }
288     --m_WaitingEnqueueCount;
289     return data;
290 }
291 
292 template class BlockingQueueBase<nn::os::CriticalSection>;
293 template class BlockingQueueBase<nn::os::InterCoreCriticalSection>;
294 template class BlockingQueueBase<nn::os::Mutex>;
295 
296 } // namespace detail
297 
298 }} // namespace nn::os
299 
300 
301 #include <new>
302 
303 using namespace nn::os;
304 
305 extern "C" {
306 
nnosBlockingQueueInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)307 void nnosBlockingQueueInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
308 {
309     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
310     new (pBlockingQueue) BlockingQueue();
311     pBlockingQueue->Initialize(buffer, size);
312 }
313 
nnosBlockingQueueTryInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)314 bool nnosBlockingQueueTryInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
315 {
316     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
317     new (pBlockingQueue) BlockingQueue();
318     Result result = pBlockingQueue->TryInitialize(buffer, size);
319     return result.IsSuccess();
320 }
321 
nnosBlockingQueueFinalize(nnosBlockingQueue * this_)322 void nnosBlockingQueueFinalize(nnosBlockingQueue* this_)
323 {
324     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
325     pBlockingQueue->Finalize();
326     pBlockingQueue->~BlockingQueue();
327 }
328 
nnosBlockingQueueTryEnqueue(nnosBlockingQueue * this_,uptr data)329 bool nnosBlockingQueueTryEnqueue(nnosBlockingQueue* this_, uptr data)
330 {
331     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
332     return pBlockingQueue->TryEnqueue(data);
333 }
334 
nnosBlockingQueueEnqueue(nnosBlockingQueue * this_,uptr data)335 void nnosBlockingQueueEnqueue(nnosBlockingQueue* this_, uptr data)
336 {
337     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
338     pBlockingQueue->Enqueue(data);
339 }
340 
nnosBlockingQueueTryJam(nnosBlockingQueue * this_,uptr data)341 bool nnosBlockingQueueTryJam(nnosBlockingQueue* this_, uptr data)
342 {
343     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
344     return pBlockingQueue->TryJam(data);
345 }
346 
nnosBlockingQueueJam(nnosBlockingQueue * this_,uptr data)347 void nnosBlockingQueueJam(nnosBlockingQueue* this_, uptr data)
348 {
349     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
350     pBlockingQueue->Jam(data);
351 }
352 
nnosBlockingQueueTryDequeue(nnosBlockingQueue * this_,uptr * pOut)353 bool nnosBlockingQueueTryDequeue(nnosBlockingQueue* this_, uptr* pOut)
354 {
355     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
356     return pBlockingQueue->TryDequeue(pOut);
357 }
358 
nnosBlockingQueueDequeue(nnosBlockingQueue * this_)359 uptr nnosBlockingQueueDequeue(nnosBlockingQueue* this_)
360 {
361     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
362     return pBlockingQueue->Dequeue();
363 }
364 
nnosBlockingQueueTryGetFront(nnosBlockingQueue * this_,uptr * pOut)365 bool nnosBlockingQueueTryGetFront(nnosBlockingQueue* this_, uptr* pOut)
366 {
367     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
368     return pBlockingQueue->TryGetFront(pOut);
369 }
370 
nnosBlockingQueueGetFront(nnosBlockingQueue * this_)371 uptr nnosBlockingQueueGetFront(nnosBlockingQueue* this_)
372 {
373     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
374     return pBlockingQueue->GetFront();
375 }
376 
nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)377 void nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
378 {
379     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
380     new (pBlockingQueue) SafeBlockingQueue();
381     pBlockingQueue->Initialize(buffer, size);
382 }
383 
nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)384 bool nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
385 {
386     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
387     new (pBlockingQueue) SafeBlockingQueue();
388     Result result = pBlockingQueue->TryInitialize(buffer, size);
389     return result.IsSuccess();
390 }
391 
nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue * this_)392 void nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue* this_)
393 {
394     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
395     pBlockingQueue->Finalize();
396     pBlockingQueue->~SafeBlockingQueue();
397 }
398 
nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue * this_,uptr data)399 bool nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue* this_, uptr data)
400 {
401     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
402     return pBlockingQueue->TryEnqueue(data);
403 }
404 
nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue * this_,uptr data)405 void nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue* this_, uptr data)
406 {
407     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
408     pBlockingQueue->Enqueue(data);
409 }
410 
nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue * this_,uptr data)411 bool nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue* this_, uptr data)
412 {
413     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
414     return pBlockingQueue->TryJam(data);
415 }
416 
nnosSafeBlockingQueueJam(nnosSafeBlockingQueue * this_,uptr data)417 void nnosSafeBlockingQueueJam(nnosSafeBlockingQueue* this_, uptr data)
418 {
419     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
420     pBlockingQueue->Jam(data);
421 }
422 
nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue * this_,uptr * pOut)423 bool nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue* this_, uptr* pOut)
424 {
425     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
426     return pBlockingQueue->TryDequeue(pOut);
427 }
428 
nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue * this_)429 uptr nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue* this_)
430 {
431     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
432     return pBlockingQueue->Dequeue();
433 }
434 
nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue * this_,uptr * pOut)435 bool nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue* this_, uptr* pOut)
436 {
437     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
438     return pBlockingQueue->TryGetFront(pOut);
439 }
440 
nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue * this_)441 uptr nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue* this_)
442 {
443     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
444     return pBlockingQueue->GetFront();
445 }
446 
447 }
448