1 /*---------------------------------------------------------------------------*
2 Project: Horizon
3 File: os_BlockingQueue.cpp
4
5 Copyright (C)2009 Nintendo Co., Ltd. All rights reserved.
6
7 These coded instructions, statements, and computer programs contain
8 proprietary information of Nintendo of America Inc. and/or Nintendo
9 Company Ltd., and are protected by Federal copyright law. They may
10 not be disclosed to third parties or copied or duplicated in any form,
11 in whole or in part, without the prior written consent of Nintendo.
12
13 $Rev: 28736 $
14 *---------------------------------------------------------------------------*/
15
16 #include <nn/assert.h>
17 #include <nn/config.h>
18 #include <nn/os/os_BlockingQueue.h>
19 #include <nn/os/os_Mutex.h>
20 #include <nn/os/os_CriticalSection.h>
21 #include <nn/os/os_InterCoreCriticalSection.h>
22 #include <nn/fnd/fnd_Interlocked.h>
23 #include <nn/util/detail/util_InitializationTransaction.h>
24
25 #include <climits>
26
27 //---------------------------------------------------------------------------
28
29 using namespace nn;
30 using namespace nn::fnd;
31 using namespace nn::svc;
32 using namespace nn::os;
33 using namespace nn::util;
34
35 namespace nn{ namespace os{
36
37 namespace detail {
38
39 template <class Locker>
~BlockingQueueBase()40 BlockingQueueBase<Locker>::~BlockingQueueBase()
41 {
42 NN_TASSERT_(m_WaitingEnqueueCount == 0 && m_WaitingDequeueCount == 0);
43 Finalize();
44 }
45
46 template <class Locker>
Initialize(uptr buffer[],size_t size)47 void BlockingQueueBase<Locker>::Initialize(uptr buffer[], size_t size)
48 {
49 m_ppBuffer = buffer;
50 m_size = size;
51 m_firstIndex = 0;
52 m_usedCount = 0;
53 m_WaitingEnqueueCount = 0;
54 m_WaitingDequeueCount = 0;
55 m_EnqueueSemaphore.Initialize(0, INT_MAX);
56 m_DequeueSemaphore.Initialize(0, INT_MAX);
57 m_cs.Initialize();
58 }
59
60 template <class Locker>
TryInitialize(uptr buffer[],size_t size)61 nn::Result BlockingQueueBase<Locker>::TryInitialize(uptr buffer[], size_t size)
62 {
63 m_ppBuffer = buffer;
64 m_size = size;
65 m_firstIndex = 0;
66 m_usedCount = 0;
67 m_WaitingEnqueueCount = 0;
68 m_WaitingDequeueCount = 0;
69
70 m_EnqueueSemaphore.Initialize(0, INT_MAX);
71 m_DequeueSemaphore.Initialize(0, INT_MAX);
72
73 NN_UTIL_RETURN_IF_FAILED(m_cs.TryInitialize());
74 return ResultSuccess();
75 }
76
77 template <class Locker>
Finalize()78 void BlockingQueueBase<Locker>::Finalize()
79 {
80 m_cs.Finalize();
81 m_DequeueSemaphore.Finalize();
82 m_EnqueueSemaphore.Finalize();
83 }
84
85
86 template <class Locker>
NotifyEnqueue() const87 inline void BlockingQueueBase<Locker>::NotifyEnqueue() const
88 {
89 if (m_WaitingEnqueueCount > 0)
90 {
91 m_EnqueueSemaphore.Release();
92 }
93 }
94
95 template <class Locker>
NotifyDequeue() const96 inline void BlockingQueueBase<Locker>::NotifyDequeue() const
97 {
98 if (m_WaitingDequeueCount > 0)
99 {
100 m_DequeueSemaphore.Release();
101 }
102 }
103
104 template <class Locker>
TryEnqueue(uptr data)105 bool BlockingQueueBase<Locker>::TryEnqueue(uptr data)
106 {
107 // キューにデータ挿入中に他のスレッドから操作されないようにロックします。
108 ScopedLock locker(m_cs);
109
110 if (m_size > m_usedCount)
111 {
112 s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
113 m_ppBuffer[lastIndex] = data;
114 m_usedCount++;
115
116 // データ挿入待ちスレッドを起床します。
117 NotifyEnqueue();
118 return true;
119 }
120 else
121 {
122 return false;
123 }
124 }
125
126 template <class Locker>
ForceEnqueue(uptr data,uptr * pOut)127 bool BlockingQueueBase<Locker>::ForceEnqueue(uptr data, uptr* pOut)
128 {
129 // キューにデータを挿入中に他のスレッドから操作されないようにロックします。
130 ScopedLock locker(m_cs);
131 bool bReturn;
132 s32 lastIndex = (m_firstIndex + m_usedCount) % m_size;
133 if (m_size > m_usedCount)
134 {
135 m_usedCount++;
136 bReturn = true;
137 }
138 else
139 {
140 if (pOut)
141 {
142 *pOut = m_ppBuffer[lastIndex];
143 }
144 m_firstIndex = (m_firstIndex + 1) % m_size;
145 bReturn = false;
146 }
147
148 m_ppBuffer[lastIndex] = data;
149 // データ挿入待ちスレッドを起床します。
150 NotifyEnqueue();
151 return bReturn;
152 }
153
154 template <class Locker>
Enqueue(uptr data)155 void BlockingQueueBase<Locker>::Enqueue(uptr data)
156 {
157 // キューに挿入処理中のスレッド数を更新します。
158 ++m_WaitingDequeueCount;
159 for(;;)
160 {
161 if (TryEnqueue(data))
162 {
163 break;
164 }
165
166 // キューに空きができるまで待機します。
167 m_DequeueSemaphore.Acquire();
168 }
169 --m_WaitingDequeueCount;
170 }
171
172 template <class Locker>
TryJam(uptr data)173 bool BlockingQueueBase<Locker>::TryJam(uptr data)
174 {
175 // キューにデータを挿入中に他のスレッドから操作されないようにロックします。
176 ScopedLock locker(m_cs);
177
178 if (m_size > m_usedCount)
179 {
180 m_firstIndex = (m_firstIndex + m_size - 1) % m_size;
181 m_ppBuffer[m_firstIndex] = data;
182 m_usedCount++;
183
184 // データ挿入待ちスレッドを起床します。
185 NotifyEnqueue();
186 return true;
187 }
188 else
189 {
190 return false;
191 }
192 }
193
194 template <class Locker>
Jam(uptr data)195 void BlockingQueueBase<Locker>::Jam(uptr data)
196 {
197 // キューに挿入処理中のスレッド数を更新します。
198 ++m_WaitingDequeueCount;
199 for(;;)
200 {
201 if (TryJam(data))
202 {
203 break;
204 }
205
206 // キューに空きができるまで待機します。
207 m_DequeueSemaphore.Acquire();
208 }
209 --m_WaitingDequeueCount;
210 }
211
212 template <class Locker>
TryDequeue(uptr * pOut)213 bool BlockingQueueBase<Locker>::TryDequeue(uptr* pOut)
214 {
215 // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。
216 ScopedLock locker(m_cs);
217
218 if (0 < m_usedCount)
219 {
220 *pOut = m_ppBuffer[m_firstIndex];
221 m_firstIndex = (m_firstIndex + 1) % m_size;
222 m_usedCount--;
223
224 // キュー空き待ちスレッドを起床します。
225 NotifyDequeue();
226 return true;
227 }
228 else
229 {
230 return false;
231 }
232 }
233
234 template <class Locker>
Dequeue()235 uptr BlockingQueueBase<Locker>::Dequeue()
236 {
237 // キューから取り出し処理中のスレッド数を更新します。
238 ++m_WaitingEnqueueCount;
239 uptr data;
240 for(;;)
241 {
242 if (TryDequeue(&data))
243 {
244 break;
245 }
246
247 // キューの内容が空の場合は待機します。
248 m_EnqueueSemaphore.Acquire();
249 }
250 --m_WaitingEnqueueCount;
251 return data;
252 }
253
254 template <class Locker>
TryGetFront(uptr * pOut) const255 bool BlockingQueueBase<Locker>::TryGetFront(uptr* pOut) const
256 {
257 // キューからデータを取り出し中に他のスレッドから操作されないようにロックします。
258 ScopedLock locker(m_cs);
259
260 if (0 < m_usedCount)
261 {
262 *pOut = m_ppBuffer[m_firstIndex];
263
264 return true;
265 }
266 else
267 {
268 return false;
269 }
270 }
271
272 template <class Locker>
GetFront() const273 uptr BlockingQueueBase<Locker>::GetFront() const
274 {
275 // キューから取り出し処理中のスレッド数を更新します。
276 ++m_WaitingEnqueueCount;
277 uptr data;
278 for(;;)
279 {
280 if (TryGetFront(&data))
281 {
282 break;
283 }
284
285 // キューの内容が空の場合は待機します。
286 m_EnqueueSemaphore.Acquire();
287 }
288 --m_WaitingEnqueueCount;
289 return data;
290 }
291
292 template class BlockingQueueBase<nn::os::CriticalSection>;
293 template class BlockingQueueBase<nn::os::InterCoreCriticalSection>;
294 template class BlockingQueueBase<nn::os::Mutex>;
295
296 } // namespace detail
297
298 }} // namespace nn::os
299
300
301 #include <new>
302
303 using namespace nn::os;
304
305 extern "C" {
306
nnosBlockingQueueInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)307 void nnosBlockingQueueInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
308 {
309 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
310 new (pBlockingQueue) BlockingQueue();
311 pBlockingQueue->Initialize(buffer, size);
312 }
313
nnosBlockingQueueTryInitialize(nnosBlockingQueue * this_,uptr buffer[],size_t size)314 bool nnosBlockingQueueTryInitialize(nnosBlockingQueue* this_, uptr buffer[], size_t size)
315 {
316 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
317 new (pBlockingQueue) BlockingQueue();
318 Result result = pBlockingQueue->TryInitialize(buffer, size);
319 return result.IsSuccess();
320 }
321
nnosBlockingQueueFinalize(nnosBlockingQueue * this_)322 void nnosBlockingQueueFinalize(nnosBlockingQueue* this_)
323 {
324 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
325 pBlockingQueue->Finalize();
326 pBlockingQueue->~BlockingQueue();
327 }
328
nnosBlockingQueueTryEnqueue(nnosBlockingQueue * this_,uptr data)329 bool nnosBlockingQueueTryEnqueue(nnosBlockingQueue* this_, uptr data)
330 {
331 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
332 return pBlockingQueue->TryEnqueue(data);
333 }
334
nnosBlockingQueueEnqueue(nnosBlockingQueue * this_,uptr data)335 void nnosBlockingQueueEnqueue(nnosBlockingQueue* this_, uptr data)
336 {
337 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
338 pBlockingQueue->Enqueue(data);
339 }
340
nnosBlockingQueueTryJam(nnosBlockingQueue * this_,uptr data)341 bool nnosBlockingQueueTryJam(nnosBlockingQueue* this_, uptr data)
342 {
343 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
344 return pBlockingQueue->TryJam(data);
345 }
346
nnosBlockingQueueJam(nnosBlockingQueue * this_,uptr data)347 void nnosBlockingQueueJam(nnosBlockingQueue* this_, uptr data)
348 {
349 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
350 pBlockingQueue->Jam(data);
351 }
352
nnosBlockingQueueTryDequeue(nnosBlockingQueue * this_,uptr * pOut)353 bool nnosBlockingQueueTryDequeue(nnosBlockingQueue* this_, uptr* pOut)
354 {
355 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
356 return pBlockingQueue->TryDequeue(pOut);
357 }
358
nnosBlockingQueueDequeue(nnosBlockingQueue * this_)359 uptr nnosBlockingQueueDequeue(nnosBlockingQueue* this_)
360 {
361 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
362 return pBlockingQueue->Dequeue();
363 }
364
nnosBlockingQueueTryGetFront(nnosBlockingQueue * this_,uptr * pOut)365 bool nnosBlockingQueueTryGetFront(nnosBlockingQueue* this_, uptr* pOut)
366 {
367 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
368 return pBlockingQueue->TryGetFront(pOut);
369 }
370
nnosBlockingQueueGetFront(nnosBlockingQueue * this_)371 uptr nnosBlockingQueueGetFront(nnosBlockingQueue* this_)
372 {
373 BlockingQueue* pBlockingQueue = reinterpret_cast<BlockingQueue*>(this_);
374 return pBlockingQueue->GetFront();
375 }
376
nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)377 void nnosSafeBlockingQueueInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
378 {
379 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
380 new (pBlockingQueue) SafeBlockingQueue();
381 pBlockingQueue->Initialize(buffer, size);
382 }
383
nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue * this_,uptr buffer[],size_t size)384 bool nnosSafeBlockingQueueTryInitialize(nnosSafeBlockingQueue* this_, uptr buffer[], size_t size)
385 {
386 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
387 new (pBlockingQueue) SafeBlockingQueue();
388 Result result = pBlockingQueue->TryInitialize(buffer, size);
389 return result.IsSuccess();
390 }
391
nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue * this_)392 void nnosSafeBlockingQueueFinalize(nnosSafeBlockingQueue* this_)
393 {
394 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
395 pBlockingQueue->Finalize();
396 pBlockingQueue->~SafeBlockingQueue();
397 }
398
nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue * this_,uptr data)399 bool nnosSafeBlockingQueueTryEnqueue(nnosSafeBlockingQueue* this_, uptr data)
400 {
401 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
402 return pBlockingQueue->TryEnqueue(data);
403 }
404
nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue * this_,uptr data)405 void nnosSafeBlockingQueueEnqueue(nnosSafeBlockingQueue* this_, uptr data)
406 {
407 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
408 pBlockingQueue->Enqueue(data);
409 }
410
nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue * this_,uptr data)411 bool nnosSafeBlockingQueueTryJam(nnosSafeBlockingQueue* this_, uptr data)
412 {
413 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
414 return pBlockingQueue->TryJam(data);
415 }
416
nnosSafeBlockingQueueJam(nnosSafeBlockingQueue * this_,uptr data)417 void nnosSafeBlockingQueueJam(nnosSafeBlockingQueue* this_, uptr data)
418 {
419 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
420 pBlockingQueue->Jam(data);
421 }
422
nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue * this_,uptr * pOut)423 bool nnosSafeBlockingQueueTryDequeue(nnosSafeBlockingQueue* this_, uptr* pOut)
424 {
425 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
426 return pBlockingQueue->TryDequeue(pOut);
427 }
428
nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue * this_)429 uptr nnosSafeBlockingQueueDequeue(nnosSafeBlockingQueue* this_)
430 {
431 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
432 return pBlockingQueue->Dequeue();
433 }
434
nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue * this_,uptr * pOut)435 bool nnosSafeBlockingQueueTryGetFront(nnosSafeBlockingQueue* this_, uptr* pOut)
436 {
437 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
438 return pBlockingQueue->TryGetFront(pOut);
439 }
440
nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue * this_)441 uptr nnosSafeBlockingQueueGetFront(nnosSafeBlockingQueue* this_)
442 {
443 SafeBlockingQueue* pBlockingQueue = reinterpret_cast<SafeBlockingQueue*>(this_);
444 return pBlockingQueue->GetFront();
445 }
446
447 }
448