WwiseExecutionQueue.cpp 15 KB


  1. /*******************************************************************************
  2. The content of this file includes portions of the proprietary AUDIOKINETIC Wwise
  3. Technology released in source code form as part of the game integration package.
  4. The content of this file may not be used without valid licenses to the
  5. AUDIOKINETIC Wwise Technology.
  6. Note that the use of the game engine is subject to the Unreal(R) Engine End User
  7. License Agreement at https://www.unrealengine.com/en-US/eula/unreal
  8. License Usage
  9. Licensees holding valid licenses to the AUDIOKINETIC Wwise Technology may use
  10. this file in accordance with the end user license agreement provided with the
  11. software or, alternatively, in accordance with the terms contained
  12. in a written agreement between you and Audiokinetic Inc.
  13. Copyright (c) 2023 Audiokinetic Inc.
  14. *******************************************************************************/
  15. #include "Wwise/WwiseExecutionQueue.h"
  16. #include "Async/TaskGraphInterfaces.h"
  17. #include "HAL/Event.h"
  18. #include "Misc/IQueuedWork.h"
  19. #include "Wwise/WwiseConcurrencyModule.h"
  20. #include "Wwise/Stats/AsyncStats.h"
  21. #include "Wwise/Stats/Concurrency.h"
  22. #include <inttypes.h>
  23. const bool FWwiseExecutionQueue::Test::bExtremelyVerbose{ false };
  24. WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockEngineDeletion{ false };
  25. WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockEngineDeleted{ false };
  26. WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockSleepOnStateUpdate{ false };
  27. WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bReduceLogVerbosity{ false };
  28. struct FWwiseExecutionQueue::TLS
  29. {
  30. static thread_local FWwiseExecutionQueue* CurrentExecutionQueue;
  31. };
  32. thread_local FWwiseExecutionQueue* FWwiseExecutionQueue::TLS::CurrentExecutionQueue{ nullptr };
  33. FWwiseExecutionQueue::FWwiseExecutionQueue(const TCHAR* InDebugName, EWwiseTaskPriority InTaskPriority) :
  34. #if ENABLE_NAMED_EVENTS || !NO_LOGGING
  35. DebugName(InDebugName),
  36. #else
  37. DebugName(TEXT("")),
  38. #endif
  39. TaskPriority(InTaskPriority)
  40. {
  41. ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueues);
  42. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue (%p \"%s\") [%" PRIi32 "]: Creating with task priority %d"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (int)InTaskPriority);
  43. }
  44. FWwiseExecutionQueue::~FWwiseExecutionQueue()
  45. {
  46. UE_CLOG(UNLIKELY(bDeleteOnceClosed && WorkerState.load(std::memory_order_seq_cst) != EWorkerState::Closed), LogWwiseConcurrency, Fatal, TEXT("Deleting FWwiseExectionQueue twice!"));
  47. Close();
  48. ASYNC_DEC_DWORD_STAT(STAT_WwiseExecutionQueues);
  49. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::~FWwiseExecutionQueue(%p \"%s\") [%" PRIi32 "]: Deleted Execution Queue"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  50. }
  51. void FWwiseExecutionQueue::Async(const TCHAR* InDebugName, FBasicFunction&& InFunction)
  52. {
  53. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Async(%p \"%s\") [%" PRIi32 "]: Enqueuing async function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  54. if (UNLIKELY(IsBeingClosed() || !OpQueue.Enqueue(FOpQueueItem(InDebugName, MoveTemp(InFunction)))))
  55. {
  56. ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueueAsyncCalls);
  57. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Async(%p \"%s\") [%" PRIi32 "]: Executing async function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  58. InFunction();
  59. return;
  60. }
  61. StartWorkerIfNeeded();
  62. }
  63. void FWwiseExecutionQueue::AsyncAlways(const TCHAR* InDebugName, FBasicFunction&& InFunction)
  64. {
  65. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Enqueuing async always function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  66. Async(InDebugName, [this, CallerThreadId = FPlatformTLS::GetCurrentThreadId(), InFunction = MoveTemp(InFunction)]() mutable
  67. {
  68. if (CallerThreadId == FPlatformTLS::GetCurrentThreadId())
  69. {
  70. LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::AsyncAlways"), [this, InFunction = MoveTemp(InFunction)]
  71. {
  72. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Executing function %p in TaskGraph"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  73. InFunction();
  74. });
  75. }
  76. else
  77. {
  78. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Executing function %p in worker thread"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  79. InFunction();
  80. }
  81. });
  82. }
  83. void FWwiseExecutionQueue::AsyncWait(const TCHAR* InDebugName, FBasicFunction&& InFunction)
  84. {
  85. SCOPED_WWISECONCURRENCY_EVENT_4(TEXT("FWwiseExecutionQueue::AsyncWait"));
  86. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Enqueuing async wait function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  87. FEventRef Event(EEventMode::ManualReset);
  88. if (UNLIKELY(IsBeingClosed() || !OpQueue.Enqueue(FOpQueueItem(InDebugName, [this, &Event, &InFunction] {
  89. ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueueAsyncWaitCalls);
  90. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Executing async wait function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  91. InFunction();
  92. Event->Trigger();
  93. }))))
  94. {
  95. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Executing async wait function %p synchronously!"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
  96. InFunction();
  97. return;
  98. }
  99. StartWorkerIfNeeded();
  100. Event->Wait();
  101. }
  102. void FWwiseExecutionQueue::Close()
  103. {
  104. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Closing"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  105. if (IsBeingClosed())
  106. {
  107. return;
  108. }
  109. AsyncWait(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::Close"), [this]
  110. {
  111. TrySetRunningWorkerToClosing();
  112. });
  113. auto State = WorkerState.load(std::memory_order_relaxed);
  114. if (State != EWorkerState::Closed)
  115. {
  116. SCOPED_WWISECONCURRENCY_EVENT_4(TEXT("FWwiseExecutionQueue::Close Waiting"));
  117. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Waiting for Closed"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  118. while (State != EWorkerState::Closed)
  119. {
  120. FPlatformProcess::Yield();
  121. State = WorkerState.load(std::memory_order_relaxed);
  122. }
  123. }
  124. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Done Closing"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  125. }
  126. void FWwiseExecutionQueue::CloseAndDelete()
  127. {
  128. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Closing and Request Deleting"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  129. bDeleteOnceClosed = true;
  130. FEvent* CloseAndDeleteFunctionReturned = FPlatformProcess::GetSynchEventFromPool(); // We must wait for the Async to be done before the Worker thread can "delete this".
  131. Async(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::CloseAndDelete"), [CloseAndDeleteFunctionReturned, this]
  132. {
  133. if (CloseAndDeleteFunctionReturned)
  134. {
  135. CloseAndDeleteFunctionReturned->Wait();
  136. FPlatformProcess::ReturnSynchEventToPool(CloseAndDeleteFunctionReturned);
  137. }
  138. else
  139. {
  140. FPlatformProcess::Yield();
  141. }
  142. TrySetRunningWorkerToClosing();
  143. });
  144. if (CloseAndDeleteFunctionReturned)
  145. {
  146. CloseAndDeleteFunctionReturned->Trigger();
  147. }
  148. }
  149. bool FWwiseExecutionQueue::IsBeingClosed() const
  150. {
  151. const auto State = WorkerState.load(std::memory_order_seq_cst);
  152. return UNLIKELY(State == EWorkerState::Closed || State == EWorkerState::Closing);
  153. }
  154. bool FWwiseExecutionQueue::IsClosed() const
  155. {
  156. const auto State = WorkerState.load(std::memory_order_seq_cst);
  157. return State == EWorkerState::Closed;
  158. }
  159. bool FWwiseExecutionQueue::IsRunningInThisThread() const
  160. {
  161. return this == TLS::CurrentExecutionQueue;
  162. }
  163. void FWwiseExecutionQueue::StartWorkerIfNeeded()
  164. {
  165. if (!TrySetRunningWorkerToAddOp() && TrySetStoppedWorkerToRunning())
  166. {
  167. if (UNLIKELY(!IWwiseConcurrencyModule::GetModule() || Test::bMockEngineDeletion || Test::bMockEngineDeleted) &&
  168. UNLIKELY(!FTaskGraphInterface::IsRunning() || Test::bMockEngineDeleted))
  169. {
  170. UE_CLOG(!Test::bMockEngineDeleted, LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::StartWorkerIfNeeded(%p \"%s\") [%" PRIi32 "]: No Task Graph. Do tasks now"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  171. Work();
  172. }
  173. else
  174. {
  175. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::StartWorkerIfNeeded(%p \"%s\") [%" PRIi32 "]: Starting new worker task with priority %d"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (int)TaskPriority);
  176. LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::StartWorkerIfNeeded"), TaskPriority, [this]
  177. {
  178. Work();
  179. });
  180. }
  181. }
  182. }
  183. void FWwiseExecutionQueue::Work()
  184. {
  185. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Work(%p \"%s\") [%" PRIi32 "]: Started worker."), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  186. do
  187. {
  188. ProcessWork();
  189. }
  190. while (KeepWorking());
  191. }
  192. bool FWwiseExecutionQueue::KeepWorking()
  193. {
  194. const auto CurrentThreadId = FPlatformTLS::GetCurrentThreadId();
  195. const auto bDeleteOnceClosedCopy = this->bDeleteOnceClosed;
  196. if (LIKELY(TrySetRunningWorkerToStopped()))
  197. {
  198. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Stopped worker."), this, DebugName, CurrentThreadId);
  199. // We don't have any more operations queued. Done.
  200. // Don't execute operations here, as the Execution Queue might be deleted here.
  201. return false;
  202. }
  203. else if (LIKELY(TrySetClosingWorkerToClosed()))
  204. {
  205. // We were exiting and we don't have operations anymore. Immediately return, as our worker is not valid at this point.
  206. // Don't do any operations here!
  207. if (bDeleteOnceClosedCopy) // We use a copy since the deletion might've already occurred
  208. {
  209. LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::KeepWorking delete"), [this]
  210. {
  211. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Auto deleting on Any Thread"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
  212. delete this;
  213. });
  214. }
  215. return false;
  216. }
  217. // Error cases
  218. else if (UNLIKELY(WorkerState.load() == EWorkerState::Closed))
  219. {
  220. UE_LOG(LogWwiseConcurrency, Error, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Worker is closed, but we are still looping."), this, DebugName, CurrentThreadId);
  221. return false;
  222. }
  223. else if (UNLIKELY(WorkerState.load() == EWorkerState::Stopped))
  224. {
  225. UE_LOG(LogWwiseConcurrency, Error, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Worker is stopped, but we haven't stopped it ourselves."), this, DebugName, CurrentThreadId);
  226. return false;
  227. }
  228. // Keep running (AddOp)
  229. return true;
  230. }
  231. void FWwiseExecutionQueue::ProcessWork()
  232. {
  233. TrySetAddOpWorkerToRunning();
  234. auto* PreviousExecutionQueue = TLS::CurrentExecutionQueue;
  235. TLS::CurrentExecutionQueue = this;
  236. for (const FOpQueueItem* Op; (Op = OpQueue.Peek()) != nullptr; OpQueue.Pop())
  237. {
  238. #if ENABLE_NAMED_EVENTS
  239. SCOPED_NAMED_EVENT_TCHAR_CONDITIONAL(Op->DebugName, WwiseNamedEvents::Color3, Op->DebugName != nullptr);
  240. #endif
  241. Op->Function();
  242. }
  243. TLS::CurrentExecutionQueue = PreviousExecutionQueue;
  244. }
  245. bool FWwiseExecutionQueue::TrySetStoppedWorkerToRunning()
  246. {
  247. return TryStateUpdate(EWorkerState::Stopped, EWorkerState::Running);
  248. }
  249. bool FWwiseExecutionQueue::TrySetRunningWorkerToStopped()
  250. {
  251. return TryStateUpdate(EWorkerState::Running, EWorkerState::Stopped);
  252. }
  253. bool FWwiseExecutionQueue::TrySetRunningWorkerToAddOp()
  254. {
  255. return TryStateUpdate(EWorkerState::Running, EWorkerState::AddOp);
  256. }
  257. bool FWwiseExecutionQueue::TrySetAddOpWorkerToRunning()
  258. {
  259. return TryStateUpdate(EWorkerState::AddOp, EWorkerState::Running);
  260. }
  261. bool FWwiseExecutionQueue::TrySetRunningWorkerToClosing()
  262. {
  263. return TryStateUpdate(EWorkerState::Running, EWorkerState::Closing)
  264. || TryStateUpdate(EWorkerState::AddOp, EWorkerState::Closing)
  265. || TryStateUpdate(EWorkerState::Stopped, EWorkerState::Closing);
  266. // Warning: Try not to do operations past this method returning "true". There's a slight chance "this" might be deleted!
  267. }
  268. bool FWwiseExecutionQueue::TrySetClosingWorkerToClosed()
  269. {
  270. return TryStateUpdate(EWorkerState::Closing, EWorkerState::Closed);
  271. // Warning: NEVER do operations past this method returning "true". "this" is probably deleted!
  272. }
  273. const TCHAR* FWwiseExecutionQueue::StateName(EWorkerState State)
  274. {
  275. switch (State)
  276. {
  277. case EWorkerState::Stopped: return TEXT("Stopped");
  278. case EWorkerState::Running: return TEXT("Running");
  279. case EWorkerState::AddOp: return TEXT("AddOp");
  280. case EWorkerState::Closing: return TEXT("Closing");
  281. case EWorkerState::Closed: return TEXT("Closed");
  282. default: return TEXT("UNKNOWN");
  283. }
  284. }
  285. bool FWwiseExecutionQueue::TryStateUpdate(EWorkerState NeededState, EWorkerState WantedState)
  286. {
  287. EWorkerState PreviousState = NeededState;
  288. bool bResult = WorkerState.compare_exchange_strong(PreviousState, WantedState);
  289. bResult = bResult && PreviousState == NeededState;
  290. UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue(%p \"%s\") [%" PRIi32 "]: %s %s [%s -> %s] %s %s"), this, DebugName, FPlatformTLS::GetCurrentThreadId(),
  291. StateName(PreviousState),
  292. bResult ? TEXT("==>") : TEXT("XX>"),
  293. StateName(NeededState), StateName(WantedState),
  294. bResult ? TEXT("==>") : TEXT("XX>"),
  295. bResult ? StateName(WantedState) : StateName(PreviousState));
  296. if (UNLIKELY(Test::bMockSleepOnStateUpdate) && bResult)
  297. {
  298. FPlatformProcess::Sleep(0.0001f);
  299. }
  300. return bResult;
  301. }