AkFifoQueue.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*******************************************************************************
  2. The content of this file includes portions of the AUDIOKINETIC Wwise Technology
  3. released in source code form as part of the SDK installer package.
  4. Commercial License Usage
  5. Licensees holding valid commercial licenses to the AUDIOKINETIC Wwise Technology
  6. may use this file in accordance with the end user license agreement provided
  7. with the software or, alternatively, in accordance with the terms contained in a
  8. written agreement between you and Audiokinetic Inc.
  9. Apache License Usage
  10. Alternatively, this file may be used under the Apache License, Version 2.0 (the
  11. "Apache License"); you may not use this file except in compliance with the
  12. Apache License. You may obtain a copy of the Apache License at
  13. http://www.apache.org/licenses/LICENSE-2.0.
  14. Unless required by applicable law or agreed to in writing, software distributed
  15. under the Apache License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
  16. OR CONDITIONS OF ANY KIND, either express or implied. See the Apache License for
  17. the specific language governing permissions and limitations under the License.
  18. Copyright (c) 2023 Audiokinetic Inc.
  19. *******************************************************************************/
  20. #ifndef _AKFIFOQUEUE_H
  21. #define _AKFIFOQUEUE_H
  22. #include <AK/SoundEngine/Common/AkTypes.h>
  23. #include <AK/SoundEngine/Common/AkAtomic.h>
  24. #include <AK/Tools/Common/AkArray.h>
  25. /// AkFifoQueue is a lock-less, thread-safe, multi-producer-multi-consumer queue data structure.
  26. /// It is designed to hold copyable values.
  27. template<typename T, T TDEFAULT, class TAlloc = ArrayPoolDefault>
  28. struct AkFifoQueue : public TAlloc
  29. {
  30. public:
  31. AkFifoQueue()
  32. : m_buffer(nullptr)
  33. , m_uQueueIndexMask(0)
  34. , m_readPos(0)
  35. , m_writePos(0)
  36. {
  37. }
  38. ~AkFifoQueue()
  39. {
  40. Term();
  41. }
  42. /// Initializes the FifoQueue and allocates memory for the specified number of entries.
  43. /// The number of entries is not growable after initialization.
  44. AKRESULT Init(
  45. AkUInt32 in_uMaxEntries ///< The number of entries. Must be a power of two.
  46. )
  47. {
  48. // check that maxentries is a power of 2
  49. AKASSERT((in_uMaxEntries & (in_uMaxEntries - 1)) == 0);
  50. m_uQueueIndexMask = in_uMaxEntries - 1;
  51. m_writePos = 0;
  52. m_readPos = 0;
  53. m_buffer = (FifoQueueEntry*)TAlloc::Alloc(sizeof(FifoQueueEntry) * in_uMaxEntries);
  54. if (m_buffer == nullptr)
  55. {
  56. return AK_InsufficientMemory;
  57. }
  58. AkZeroMemLarge(m_buffer, sizeof(FifoQueueEntry) * in_uMaxEntries);
  59. for (AkUInt32 i = 0; i < in_uMaxEntries; ++i)
  60. {
  61. m_buffer[i].value = TDEFAULT;
  62. AkAtomicStore64(&m_buffer[i].uSequence, i);
  63. }
  64. return AK_Success;
  65. }
  66. /// Free memory reserved for the queue and reset internal state
  67. /// The queue MUST be empty when this is called!
  68. void Term()
  69. {
  70. if (m_buffer)
  71. {
  72. AKASSERT(m_readPos == m_writePos);
  73. TAlloc::Free(m_buffer);
  74. m_buffer = nullptr;
  75. }
  76. m_readPos = 0;
  77. m_writePos = 0;
  78. }
  79. /// Enqueues the provided value. The value will be copied to the queue's internal buffer.
  80. /// Returns true if the enqueue was performed successfully.
  81. /// Returns false if the enqueue could not be performed. This can happen if the queue is "full", and some dequeue operations have to occur.
  82. AK_NODISCARD bool Enqueue(T in_value)
  83. {
  84. const AkUInt64 uQueueIndexMask = m_uQueueIndexMask;
  85. FifoQueueEntry* pBuffer = m_buffer;
  86. AkInt64 writePos = AkAtomicLoad64(&m_writePos);
  87. do {
  88. // see where we are in the sequence, relative to where we can write data
  89. AkInt64 sequenceDelta = AkAtomicLoad64(&pBuffer[writePos & uQueueIndexMask].uSequence) - writePos;
  90. // if we're in the right spot, and we can successfully write an updated write position, break out and write the handle into the queue
  91. if (sequenceDelta == 0)
  92. {
  93. if (AkAtomicCas64(&m_writePos, writePos + 1, writePos))
  94. {
  95. break;
  96. }
  97. }
  98. else if (sequenceDelta < 0)
  99. {
  100. // we would have over-enqueued if we tried to write the position in. Return false; the user needs to decide how to handle things
  101. return false;
  102. }
  103. else
  104. {
  105. // if it didn't work, reload writePos: someone else must have written to the sequence and we need to get caught up
  106. writePos = AkAtomicLoad64(&m_writePos);
  107. }
  108. } while (true);
  109. // advance the sequence by one so that it can be dequeued
  110. pBuffer[writePos & uQueueIndexMask].value = in_value;
  111. AkAtomicStore64(&pBuffer[writePos & uQueueIndexMask].uSequence, writePos + 1);
  112. return true;
  113. }
  114. /// Dequeues a value from the specified queue, copying it to io_value
  115. /// \return true if a value was successfully dequeued, false otherwise (if false, io_value will not be written to)
  116. bool Dequeue(T& io_value)
  117. {
  118. const AkInt64 uQueueIndexMask = m_uQueueIndexMask;
  119. FifoQueueEntry* pBuffer = m_buffer;
  120. AkInt64 readPos = AkAtomicLoad64(&m_readPos);
  121. do {
  122. // see where we are in the sequence relative to where we can write data
  123. AkInt64 sequenceDelta = AkAtomicLoad64(&pBuffer[readPos & uQueueIndexMask].uSequence) - (readPos + 1);
  124. // if we're in the right spot, and we can successfully write an updated read position, break out and read the entry
  125. if (sequenceDelta == 0)
  126. {
  127. if (AkAtomicCas64(&m_readPos, readPos + 1, readPos))
  128. {
  129. break;
  130. }
  131. }
  132. // if an entry has yet to be written, bail out
  133. else if (sequenceDelta < 0)
  134. {
  135. return false;
  136. }
  137. else
  138. {
  139. // if it didn't work, reload readPos
  140. readPos = AkAtomicLoad64(&m_readPos);
  141. }
  142. } while (true);
  143. // update the acceptable sequence value for this entry
  144. io_value = pBuffer[readPos & uQueueIndexMask].value;
  145. AkAtomicStore64(&pBuffer[readPos & uQueueIndexMask].uSequence, readPos + m_uQueueIndexMask + 1);
  146. return true;
  147. }
  148. /// Checks if there is a value available to be dequeued
  149. bool Empty()
  150. {
  151. AkInt64 readPos = AkAtomicLoad64(&m_readPos);
  152. AkInt64 sequenceDelta = AkAtomicLoad64(&m_buffer[readPos & m_uQueueIndexMask].uSequence) - (readPos + 1);
  153. return sequenceDelta < 0;
  154. }
  155. private:
  156. struct FifoQueueEntry
  157. {
  158. // Value actually contained in the queue
  159. T value;
  160. // Global index of the queue entry in the sequence, to detect when we are at a valid read or write pos
  161. AkAtomic64 uSequence;
  162. };
  163. // Buffer of QueueEntries
  164. FifoQueueEntry* m_buffer;
  165. // Mask to apply to the read/write position to clamp it to array bounds
  166. AkInt64 m_uQueueIndexMask;
  167. // readIndex of where we are in the sequence
  168. AkAtomic64 m_readPos;
  169. // writeIndex of where we are in the sequence
  170. AkAtomic64 m_writePos;
  171. };
  172. #endif // _AKFIFOQUEUE_H