1 /*---------------------------------------------------------------------------*
2   Project:  Horizon
3   File:     os_BlockingQueue.cpp
4 
5   Copyright (C)2009-2012 Nintendo Co., Ltd.  All rights reserved.
6 
7   These coded instructions, statements, and computer programs contain
8   proprietary information of Nintendo of America Inc. and/or Nintendo
9   Company Ltd., and are protected by Federal copyright law.  They may
10   not be disclosed to third parties or copied or duplicated in any form,
11   in whole or in part, without the prior written consent of Nintendo.
12 
13   $Rev: 46347 $
14  *---------------------------------------------------------------------------*/
15 
16 #include <nn/assert.h>
17 #include <nn/config.h>
18 #include <nn/os/os_BlockingQueue.h>
19 #include <nn/os/os_Mutex.h>
20 #include <nn/os/os_CriticalSection.h>
21 #include <nn/fnd/fnd_Interlocked.h>
22 #include <nn/util/detail/util_InitializationTransaction.h>
23 
24 #include <climits>
25 
26 //---------------------------------------------------------------------------
27 
28 using namespace nn;
29 using namespace nn::fnd;
30 using namespace nn::svc;
31 using namespace nn::os;
32 using namespace nn::util;
33 
34 namespace nn{ namespace os{
35 
36 namespace detail {
37 
38 template <class Locker>
~BlockingQueueBase()39 BlockingQueueBase<Locker>::~BlockingQueueBase()
40 {
41     NN_TASSERT_(m_WaitingEnqueueCount == 0 && m_WaitingDequeueCount == 0);
42     Finalize();
43 }
44 
45 template <class Locker>
Initialize(uptr buffer[],size_t size)46 void BlockingQueueBase<Locker>::Initialize(uptr buffer[], size_t size)
47 {
48     m_ppBuffer      = buffer;
49     m_size          = size;
50     m_firstIndex    = 0;
51     m_usedCount     = 0;
52     m_WaitingEnqueueCount = 0;
53     m_WaitingDequeueCount = 0;
54     m_EnqueueSemaphore.Initialize(0);
55     m_DequeueSemaphore.Initialize(0);
56     m_cs.Initialize();
57 }
58 
59 template <class Locker>
TryInitialize(uptr buffer[],size_t size)60 nn::Result BlockingQueueBase<Locker>::TryInitialize(uptr buffer[], size_t size)
61 {
62     m_ppBuffer      = buffer;
63     m_size          = size;
64     m_firstIndex    = 0;
65     m_usedCount     = 0;
66     m_WaitingEnqueueCount = 0;
67     m_WaitingDequeueCount = 0;
68 
69     m_EnqueueSemaphore.Initialize(0);
70     m_DequeueSemaphore.Initialize(0);
71 
72     NN_UTIL_RETURN_IF_FAILED(m_cs.TryInitialize());
73     return ResultSuccess();
74 }
75 
76 template <class Locker>
Finalize()77 void BlockingQueueBase<Locker>::Finalize()
78 {
79     m_cs.Finalize();
80     m_DequeueSemaphore.Finalize();
81     m_EnqueueSemaphore.Finalize();
82 }
83 
84 
85 template <class Locker>
NotifyEnqueue() const86 inline void BlockingQueueBase<Locker>::NotifyEnqueue() const
87 {
88     if (m_WaitingEnqueueCount > 0)
89     {
90         m_EnqueueSemaphore.Release();
91     }
92 }
93 
94 template <class Locker>
NotifyDequeue() const95 inline void BlockingQueueBase<Locker>::NotifyDequeue() const
96 {
97     if (m_WaitingDequeueCount > 0)
98     {
99         m_DequeueSemaphore.Release();
100     }
101 }
102 
103 template <class Locker>
TryEnqueue(uptr data)104 bool BlockingQueueBase<Locker>::TryEnqueue(uptr data)
105 {
106     // Locks while inserting data in the queue so that other threads cannot perform operations.
107     ScopedLock locker(m_cs);
108 
109     if (m_size > m_usedCount)
110     {
111         s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
112         m_ppBuffer[lastIndex] = data;
113         m_usedCount++;
114 
115         // Wakes up thread waiting for data insertion.
116         NotifyEnqueue();
117         return true;
118     }
119     else
120     {
121         return false;
122     }
123 }
124 
125 template <class Locker>
ForceEnqueue(uptr data,uptr * pOut)126 bool BlockingQueueBase<Locker>::ForceEnqueue(uptr data, uptr* pOut)
127 {
128     // Locks while inserting data in the queue so that other threads cannot perform operations.
129     ScopedLock locker(m_cs);
130     bool bReturn;
131     s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
132     if (m_size > m_usedCount)
133     {
134         m_usedCount++;
135         bReturn = true;
136     }
137     else
138     {
139         if (pOut)
140         {
141             *pOut = m_ppBuffer[lastIndex];
142         }
143         m_firstIndex = (m_firstIndex + 1) % m_size;
144         bReturn = false;
145     }
146 
147     m_ppBuffer[lastIndex] = data;
148     // Wakes up thread waiting for data insertion.
149     NotifyEnqueue();
150     return bReturn;
151 }
152 
153 template <class Locker>
Enqueue(uptr data)154 void BlockingQueueBase<Locker>::Enqueue(uptr data)
155 {
156     // Updates the number of threads during the process to insert in queue.
157     ++m_WaitingDequeueCount;
158     for(;;)
159     {
160         if (TryEnqueue(data))
161         {
162             break;
163         }
164 
165         // Waits until there is space in the queue.
166         m_DequeueSemaphore.Acquire();
167     }
168     --m_WaitingDequeueCount;
169 }
170 
171 template <class Locker>
TryJam(uptr data)172 bool BlockingQueueBase<Locker>::TryJam(uptr data)
173 {
174     // Locks while inserting data in the queue so that other threads cannot perform operations.
175     ScopedLock locker(m_cs);
176 
177     if (m_size > m_usedCount)
178     {
179         m_firstIndex = (m_firstIndex + m_size - 1) % m_size;
180         m_ppBuffer[m_firstIndex] = data;
181         m_usedCount++;
182 
183         // Wakes up thread waiting for data insertion.
184         NotifyEnqueue();
185         return true;
186     }
187     else
188     {
189         return false;
190     }
191 }
192 
193 template <class Locker>
Jam(uptr data)194 void BlockingQueueBase<Locker>::Jam(uptr data)
195 {
196     // Updates the number of threads during the process to insert in queue.
197     ++m_WaitingDequeueCount;
198     for(;;)
199     {
200         if (TryJam(data))
201         {
202             break;
203         }
204 
205         // Waits until there is space in the queue.
206         m_DequeueSemaphore.Acquire();
207     }
208     --m_WaitingDequeueCount;
209 }
210 
211 template <class Locker>
TryDequeue(uptr * pOut)212 bool BlockingQueueBase<Locker>::TryDequeue(uptr* pOut)
213 {
214     // Locks while retrieving data from the queue so that other threads cannot perform operations.
215     ScopedLock locker(m_cs);
216 
217     if (0 < m_usedCount)
218     {
219         *pOut = m_ppBuffer[m_firstIndex];
220         m_firstIndex = (m_firstIndex + 1) % m_size;
221         m_usedCount--;
222 
223         // Wakes up the thread waiting for a space in the queue.
224         NotifyDequeue();
225         return true;
226     }
227     else
228     {
229         return false;
230     }
231 }
232 
233 template <class Locker>
Dequeue()234 uptr BlockingQueueBase<Locker>::Dequeue()
235 {
236     // Updates the number of threads during the process to retrieve data from the queue.
237     ++m_WaitingEnqueueCount;
238     uptr data;
239     for(;;)
240     {
241         if (TryDequeue(&data))
242         {
243             break;
244         }
245 
246         // Waits when the queue content is empty.
247         m_EnqueueSemaphore.Acquire();
248     }
249     --m_WaitingEnqueueCount;
250     return data;
251 }
252 
253 template <class Locker>
TryGetFront(uptr * pOut) const254 bool BlockingQueueBase<Locker>::TryGetFront(uptr* pOut) const
255 {
256     // Locks while retrieving data from the queue so that other threads cannot perform operations.
257     ScopedLock locker(m_cs);
258 
259     if (0 < m_usedCount)
260     {
261         *pOut = m_ppBuffer[m_firstIndex];
262 
263         return true;
264     }
265     else
266     {
267         return false;
268     }
269 }
270 
271 template <class Locker>
GetFront() const272 uptr BlockingQueueBase<Locker>::GetFront() const
273 {
274     // Updates the number of threads during the process to retrieve data from the queue.
275     ++m_WaitingEnqueueCount;
276     uptr data;
277     for(;;)
278     {
279         if (TryGetFront(&data))
280         {
281             break;
282         }
283 
284         // Waits when the queue content is empty.
285         m_EnqueueSemaphore.Acquire();
286     }
287     --m_WaitingEnqueueCount;
288     return data;
289 }
290 
291 template class BlockingQueueBase<nn::os::CriticalSection>;
292 template class BlockingQueueBase<nn::os::Mutex>;
293 
294 } // namespace detail
295 
296 }} // namespace nn::os
297 
298 
299 #include <new>
300 
301 using namespace nn::os;
302 
303 extern "C" {
304 
nnosBlockingQueueInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)305 void nnosBlockingQueueInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
306 {
307     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
308     new (pBlockingQueue) BlockingQueue();
309     pBlockingQueue->Initialize(buffer, size);
310 }
311 
nnosBlockingQueueTryInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)312 bool nnosBlockingQueueTryInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
313 {
314     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
315     new (pBlockingQueue) BlockingQueue();
316     Result result = pBlockingQueue->TryInitialize(buffer, size);
317     return result.IsSuccess();
318 }
319 
nnosBlockingQueueFinalize(nnosBlockingQueue * this_)320 void nnosBlockingQueueFinalize(nnosBlockingQueue* this_)
321 {
322     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
323     pBlockingQueue->Finalize();
324     pBlockingQueue->~BlockingQueue();
325 }
326 
nnosBlockingQueueTryEnqueue(nnosBlockingQueue * this_,uptr data)327 bool nnosBlockingQueueTryEnqueue(nnosBlockingQueue* this_, uptr data)
328 {
329     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
330     return pBlockingQueue->TryEnqueue(data);
331 }
332 
nnosBlockingQueueEnqueue(nnosBlockingQueue * this_,uptr data)333 void nnosBlockingQueueEnqueue(nnosBlockingQueue* this_, uptr data)
334 {
335     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
336     pBlockingQueue->Enqueue(data);
337 }
338 
nnosBlockingQueueTryJam(nnosBlockingQueue * this_,uptr data)339 bool nnosBlockingQueueTryJam(nnosBlockingQueue* this_, uptr data)
340 {
341     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
342     return pBlockingQueue->TryJam(data);
343 }
344 
nnosBlockingQueueJam(nnosBlockingQueue * this_,uptr data)345 void nnosBlockingQueueJam(nnosBlockingQueue* this_, uptr data)
346 {
347     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
348     pBlockingQueue->Jam(data);
349 }
350 
nnosBlockingQueueTryDequeue(nnosBlockingQueue * this_,uptr * pOut)351 bool nnosBlockingQueueTryDequeue(nnosBlockingQueue* this_, uptr* pOut)
352 {
353     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
354     return pBlockingQueue->TryDequeue(pOut);
355 }
356 
nnosBlockingQueueDequeue(nnosBlockingQueue * this_)357 uptr nnosBlockingQueueDequeue(nnosBlockingQueue* this_)
358 {
359     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
360     return pBlockingQueue->Dequeue();
361 }
362 
nnosBlockingQueueTryGetFront(nnosBlockingQueue * this_,uptr * pOut)363 bool nnosBlockingQueueTryGetFront(nnosBlockingQueue* this_, uptr* pOut)
364 {
365     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
366     return pBlockingQueue->TryGetFront(pOut);
367 }
368 
nnosBlockingQueueGetFront(nnosBlockingQueue * this_)369 uptr nnosBlockingQueueGetFront(nnosBlockingQueue* this_)
370 {
371     BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
372     return pBlockingQueue->GetFront();
373 }
374 
nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)375 void nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
376 {
377     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
378     new (pBlockingQueue) SafeBlockingQueue();
379     pBlockingQueue->Initialize(buffer, size);
380 }
381 
nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)382 bool nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
383 {
384     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
385     new (pBlockingQueue) SafeBlockingQueue();
386     Result result = pBlockingQueue->TryInitialize(buffer, size);
387     return result.IsSuccess();
388 }
389 
nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue * this_)390 void nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue* this_)
391 {
392     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
393     pBlockingQueue->Finalize();
394     pBlockingQueue->~SafeBlockingQueue();
395 }
396 
nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue * this_,uptr data)397 bool nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue* this_, uptr data)
398 {
399     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
400     return pBlockingQueue->TryEnqueue(data);
401 }
402 
nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue * this_,uptr data)403 void nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue* this_, uptr data)
404 {
405     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
406     pBlockingQueue->Enqueue(data);
407 }
408 
nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue * this_,uptr data)409 bool nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue* this_, uptr data)
410 {
411     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
412     return pBlockingQueue->TryJam(data);
413 }
414 
nnosSafeBlockingQueueJam(nnosSafeBlockingQueue * this_,uptr data)415 void nnosSafeBlockingQueueJam(nnosSafeBlockingQueue* this_, uptr data)
416 {
417     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
418     pBlockingQueue->Jam(data);
419 }
420 
nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue * this_,uptr * pOut)421 bool nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue* this_, uptr* pOut)
422 {
423     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
424     return pBlockingQueue->TryDequeue(pOut);
425 }
426 
nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue * this_)427 uptr nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue* this_)
428 {
429     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
430     return pBlockingQueue->Dequeue();
431 }
432 
nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue * this_,uptr * pOut)433 bool nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue* this_, uptr* pOut)
434 {
435     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
436     return pBlockingQueue->TryGetFront(pOut);
437 }
438 
nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue * this_)439 uptr nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue* this_)
440 {
441     SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
442     return pBlockingQueue->GetFront();
443 }
444 
445 }
446