1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_BlockingQueue.cpp
4
5 Copyright (C)2009-2012 Nintendo Co., Ltd. All rights reserved.
6
7 These coded instructions, statements, and computer programs contain
8 proprietary information of Nintendo of America Inc. and/or Nintendo
9 Company Ltd., and are protected by Federal copyright law. They may
10 not be disclosed to third parties or copied or duplicated in any form,
11 in whole or in part, without the prior written consent of Nintendo.
12
13 $Rev: 46347 $
14 *---------------------------------------------------------------------------*/
15
16 #include <nn/assert.h>
17 #include <nn/config.h>
18 #include <nn/os/os_BlockingQueue.h>
19 #include <nn/os/os_Mutex.h>
20 #include <nn/os/os_CriticalSection.h>
21 #include <nn/fnd/fnd_Interlocked.h>
22 #include <nn/util/detail/util_InitializationTransaction.h>
23
24 #include <climits>
25
26 //---------------------------------------------------------------------------
27
28 using namespace nn;
29 using namespace nn::fnd;
30 using namespace nn::svc;
31 using namespace nn::os;
32 using namespace nn::util;
33
34 namespace nn{ namespace os{
35
36 namespace detail {
37
38 template <class Locker>
~BlockingQueueBase()39 BlockingQueueBase<Locker>::~BlockingQueueBase()
40 {
41 NN_TASSERT_(m_WaitingEnqueueCount == 0 && m_WaitingDequeueCount == 0);
42 Finalize();
43 }
44
45 template <class Locker>
Initialize(uptr buffer[],size_t size)46 void BlockingQueueBase<Locker>::Initialize(uptr buffer[], size_t size)
47 {
48 m_ppBuffer = buffer;
49 m_size = size;
50 m_firstIndex = 0;
51 m_usedCount = 0;
52 m_WaitingEnqueueCount = 0;
53 m_WaitingDequeueCount = 0;
54 m_EnqueueSemaphore.Initialize(0);
55 m_DequeueSemaphore.Initialize(0);
56 m_cs.Initialize();
57 }
58
59 template <class Locker>
TryInitialize(uptr buffer[],size_t size)60 nn::Result BlockingQueueBase<Locker>::TryInitialize(uptr buffer[], size_t size)
61 {
62 m_ppBuffer = buffer;
63 m_size = size;
64 m_firstIndex = 0;
65 m_usedCount = 0;
66 m_WaitingEnqueueCount = 0;
67 m_WaitingDequeueCount = 0;
68
69 m_EnqueueSemaphore.Initialize(0);
70 m_DequeueSemaphore.Initialize(0);
71
72 NN_UTIL_RETURN_IF_FAILED(m_cs.TryInitialize());
73 return ResultSuccess();
74 }
75
76 template <class Locker>
Finalize()77 void BlockingQueueBase<Locker>::Finalize()
78 {
79 m_cs.Finalize();
80 m_DequeueSemaphore.Finalize();
81 m_EnqueueSemaphore.Finalize();
82 }
83
84
85 template <class Locker>
NotifyEnqueue() const86 inline void BlockingQueueBase<Locker>::NotifyEnqueue() const
87 {
88 if (m_WaitingEnqueueCount > 0)
89 {
90 m_EnqueueSemaphore.Release();
91 }
92 }
93
94 template <class Locker>
NotifyDequeue() const95 inline void BlockingQueueBase<Locker>::NotifyDequeue() const
96 {
97 if (m_WaitingDequeueCount > 0)
98 {
99 m_DequeueSemaphore.Release();
100 }
101 }
102
103 template <class Locker>
TryEnqueue(uptr data)104 bool BlockingQueueBase<Locker>::TryEnqueue(uptr data)
105 {
106 // Locks while inserting data in the queue so that other threads cannot perform operations.
107 ScopedLock locker(m_cs);
108
109 if (m_size > m_usedCount)
110 {
111 s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
112 m_ppBuffer[lastIndex] = data;
113 m_usedCount++;
114
115 // Wakes up thread waiting for data insertion.
116 NotifyEnqueue();
117 return true;
118 }
119 else
120 {
121 return false;
122 }
123 }
124
125 template <class Locker>
ForceEnqueue(uptr data,uptr * pOut)126 bool BlockingQueueBase<Locker>::ForceEnqueue(uptr data, uptr* pOut)
127 {
128 // Locks while inserting data in the queue so that other threads cannot perform operations.
129 ScopedLock locker(m_cs);
130 bool bReturn;
131 s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
132 if (m_size > m_usedCount)
133 {
134 m_usedCount++;
135 bReturn = true;
136 }
137 else
138 {
139 if (pOut)
140 {
141 *pOut = m_ppBuffer[lastIndex];
142 }
143 m_firstIndex = (m_firstIndex + 1) % m_size;
144 bReturn = false;
145 }
146
147 m_ppBuffer[lastIndex] = data;
148 // Wakes up thread waiting for data insertion.
149 NotifyEnqueue();
150 return bReturn;
151 }
152
153 template <class Locker>
Enqueue(uptr data)154 void BlockingQueueBase<Locker>::Enqueue(uptr data)
155 {
156 // Updates the number of threads during the process to insert in queue.
157 ++m_WaitingDequeueCount;
158 for(;;)
159 {
160 if (TryEnqueue(data))
161 {
162 break;
163 }
164
165 // Waits until there is space in the queue.
166 m_DequeueSemaphore.Acquire();
167 }
168 --m_WaitingDequeueCount;
169 }
170
171 template <class Locker>
TryJam(uptr data)172 bool BlockingQueueBase<Locker>::TryJam(uptr data)
173 {
174 // Locks while inserting data in the queue so that other threads cannot perform operations.
175 ScopedLock locker(m_cs);
176
177 if (m_size > m_usedCount)
178 {
179 m_firstIndex = (m_firstIndex + m_size - 1) % m_size;
180 m_ppBuffer[m_firstIndex] = data;
181 m_usedCount++;
182
183 // Wakes up thread waiting for data insertion.
184 NotifyEnqueue();
185 return true;
186 }
187 else
188 {
189 return false;
190 }
191 }
192
193 template <class Locker>
Jam(uptr data)194 void BlockingQueueBase<Locker>::Jam(uptr data)
195 {
196 // Updates the number of threads during the process to insert in queue.
197 ++m_WaitingDequeueCount;
198 for(;;)
199 {
200 if (TryJam(data))
201 {
202 break;
203 }
204
205 // Waits until there is space in the queue.
206 m_DequeueSemaphore.Acquire();
207 }
208 --m_WaitingDequeueCount;
209 }
210
211 template <class Locker>
TryDequeue(uptr * pOut)212 bool BlockingQueueBase<Locker>::TryDequeue(uptr* pOut)
213 {
214 // Locks while retrieving data from the queue so that other threads cannot perform operations.
215 ScopedLock locker(m_cs);
216
217 if (0 < m_usedCount)
218 {
219 *pOut = m_ppBuffer[m_firstIndex];
220 m_firstIndex = (m_firstIndex + 1) % m_size;
221 m_usedCount--;
222
223 // Wakes up the thread waiting for a space in the queue.
224 NotifyDequeue();
225 return true;
226 }
227 else
228 {
229 return false;
230 }
231 }
232
233 template <class Locker>
Dequeue()234 uptr BlockingQueueBase<Locker>::Dequeue()
235 {
236 // Updates the number of threads during the process to retrieve data from the queue.
237 ++m_WaitingEnqueueCount;
238 uptr data;
239 for(;;)
240 {
241 if (TryDequeue(&data))
242 {
243 break;
244 }
245
246 // Waits when the queue content is empty.
247 m_EnqueueSemaphore.Acquire();
248 }
249 --m_WaitingEnqueueCount;
250 return data;
251 }
252
253 template <class Locker>
TryGetFront(uptr * pOut) const254 bool BlockingQueueBase<Locker>::TryGetFront(uptr* pOut) const
255 {
256 // Locks while retrieving data from the queue so that other threads cannot perform operations.
257 ScopedLock locker(m_cs);
258
259 if (0 < m_usedCount)
260 {
261 *pOut = m_ppBuffer[m_firstIndex];
262
263 return true;
264 }
265 else
266 {
267 return false;
268 }
269 }
270
271 template <class Locker>
GetFront() const272 uptr BlockingQueueBase<Locker>::GetFront() const
273 {
274 // Updates the number of threads during the process to retrieve data from the queue.
275 ++m_WaitingEnqueueCount;
276 uptr data;
277 for(;;)
278 {
279 if (TryGetFront(&data))
280 {
281 break;
282 }
283
284 // Waits when the queue content is empty.
285 m_EnqueueSemaphore.Acquire();
286 }
287 --m_WaitingEnqueueCount;
288 return data;
289 }
290
291 template class BlockingQueueBase<nn::os::CriticalSection>;
292 template class BlockingQueueBase<nn::os::Mutex>;
293
294 } // namespace detail
295
296 }} // namespace nn::os
297
298
299 #include <new>
300
301 using namespace nn::os;
302
303 extern "C" {
304
nnosBlockingQueueInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)305 void nnosBlockingQueueInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
306 {
307 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
308 new (pBlockingQueue) BlockingQueue();
309 pBlockingQueue->Initialize(buffer, size);
310 }
311
nnosBlockingQueueTryInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)312 bool nnosBlockingQueueTryInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
313 {
314 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
315 new (pBlockingQueue) BlockingQueue();
316 Result result = pBlockingQueue->TryInitialize(buffer, size);
317 return result.IsSuccess();
318 }
319
nnosBlockingQueueFinalize(nnosBlockingQueue * this_)320 void nnosBlockingQueueFinalize(nnosBlockingQueue* this_)
321 {
322 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
323 pBlockingQueue->Finalize();
324 pBlockingQueue->~BlockingQueue();
325 }
326
nnosBlockingQueueTryEnqueue(nnosBlockingQueue * this_,uptr data)327 bool nnosBlockingQueueTryEnqueue(nnosBlockingQueue* this_, uptr data)
328 {
329 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
330 return pBlockingQueue->TryEnqueue(data);
331 }
332
nnosBlockingQueueEnqueue(nnosBlockingQueue * this_,uptr data)333 void nnosBlockingQueueEnqueue(nnosBlockingQueue* this_, uptr data)
334 {
335 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
336 pBlockingQueue->Enqueue(data);
337 }
338
nnosBlockingQueueTryJam(nnosBlockingQueue * this_,uptr data)339 bool nnosBlockingQueueTryJam(nnosBlockingQueue* this_, uptr data)
340 {
341 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
342 return pBlockingQueue->TryJam(data);
343 }
344
nnosBlockingQueueJam(nnosBlockingQueue * this_,uptr data)345 void nnosBlockingQueueJam(nnosBlockingQueue* this_, uptr data)
346 {
347 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
348 pBlockingQueue->Jam(data);
349 }
350
nnosBlockingQueueTryDequeue(nnosBlockingQueue * this_,uptr * pOut)351 bool nnosBlockingQueueTryDequeue(nnosBlockingQueue* this_, uptr* pOut)
352 {
353 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
354 return pBlockingQueue->TryDequeue(pOut);
355 }
356
nnosBlockingQueueDequeue(nnosBlockingQueue * this_)357 uptr nnosBlockingQueueDequeue(nnosBlockingQueue* this_)
358 {
359 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
360 return pBlockingQueue->Dequeue();
361 }
362
nnosBlockingQueueTryGetFront(nnosBlockingQueue * this_,uptr * pOut)363 bool nnosBlockingQueueTryGetFront(nnosBlockingQueue* this_, uptr* pOut)
364 {
365 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
366 return pBlockingQueue->TryGetFront(pOut);
367 }
368
nnosBlockingQueueGetFront(nnosBlockingQueue * this_)369 uptr nnosBlockingQueueGetFront(nnosBlockingQueue* this_)
370 {
371 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
372 return pBlockingQueue->GetFront();
373 }
374
nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)375 void nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
376 {
377 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
378 new (pBlockingQueue) SafeBlockingQueue();
379 pBlockingQueue->Initialize(buffer, size);
380 }
381
nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)382 bool nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
383 {
384 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
385 new (pBlockingQueue) SafeBlockingQueue();
386 Result result = pBlockingQueue->TryInitialize(buffer, size);
387 return result.IsSuccess();
388 }
389
nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue * this_)390 void nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue* this_)
391 {
392 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
393 pBlockingQueue->Finalize();
394 pBlockingQueue->~SafeBlockingQueue();
395 }
396
nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue * this_,uptr data)397 bool nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue* this_, uptr data)
398 {
399 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
400 return pBlockingQueue->TryEnqueue(data);
401 }
402
nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue * this_,uptr data)403 void nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue* this_, uptr data)
404 {
405 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
406 pBlockingQueue->Enqueue(data);
407 }
408
nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue * this_,uptr data)409 bool nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue* this_, uptr data)
410 {
411 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
412 return pBlockingQueue->TryJam(data);
413 }
414
nnosSafeBlockingQueueJam(nnosSafeBlockingQueue * this_,uptr data)415 void nnosSafeBlockingQueueJam(nnosSafeBlockingQueue* this_, uptr data)
416 {
417 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
418 pBlockingQueue->Jam(data);
419 }
420
nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue * this_,uptr * pOut)421 bool nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue* this_, uptr* pOut)
422 {
423 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
424 return pBlockingQueue->TryDequeue(pOut);
425 }
426
nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue * this_)427 uptr nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue* this_)
428 {
429 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
430 return pBlockingQueue->Dequeue();
431 }
432
nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue * this_,uptr * pOut)433 bool nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue* this_, uptr* pOut)
434 {
435 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
436 return pBlockingQueue->TryGetFront(pOut);
437 }
438
nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue * this_)439 uptr nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue* this_)
440 {
441 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
442 return pBlockingQueue->GetFront();
443 }
444
445 }
446