123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- #include "Wwise/WwiseExecutionQueue.h"
- #include "Async/TaskGraphInterfaces.h"
- #include "HAL/Event.h"
- #include "Misc/IQueuedWork.h"
- #include "Wwise/WwiseConcurrencyModule.h"
- #include "Wwise/Stats/AsyncStats.h"
- #include "Wwise/Stats/Concurrency.h"
- #include <inttypes.h>
- const bool FWwiseExecutionQueue::Test::bExtremelyVerbose{ false };
- WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockEngineDeletion{ false };
- WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockEngineDeleted{ false };
- WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bMockSleepOnStateUpdate{ false };
- WWISE_EXECUTIONQUEUE_TEST_CONST bool FWwiseExecutionQueue::Test::bReduceLogVerbosity{ false };
- struct FWwiseExecutionQueue::TLS
- {
- static thread_local FWwiseExecutionQueue* CurrentExecutionQueue;
- };
- thread_local FWwiseExecutionQueue* FWwiseExecutionQueue::TLS::CurrentExecutionQueue{ nullptr };
- FWwiseExecutionQueue::FWwiseExecutionQueue(const TCHAR* InDebugName, EWwiseTaskPriority InTaskPriority) :
- #if ENABLE_NAMED_EVENTS || !NO_LOGGING
- DebugName(InDebugName),
- #else
- DebugName(TEXT("")),
- #endif
- TaskPriority(InTaskPriority)
- {
- ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueues);
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue (%p \"%s\") [%" PRIi32 "]: Creating with task priority %d"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (int)InTaskPriority);
- }
- FWwiseExecutionQueue::~FWwiseExecutionQueue()
- {
- UE_CLOG(UNLIKELY(bDeleteOnceClosed && WorkerState.load(std::memory_order_seq_cst) != EWorkerState::Closed), LogWwiseConcurrency, Fatal, TEXT("Deleting FWwiseExectionQueue twice!"));
- Close();
- ASYNC_DEC_DWORD_STAT(STAT_WwiseExecutionQueues);
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::~FWwiseExecutionQueue(%p \"%s\") [%" PRIi32 "]: Deleted Execution Queue"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- }
- void FWwiseExecutionQueue::Async(const TCHAR* InDebugName, FBasicFunction&& InFunction)
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Async(%p \"%s\") [%" PRIi32 "]: Enqueuing async function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- if (UNLIKELY(IsBeingClosed() || !OpQueue.Enqueue(FOpQueueItem(InDebugName, MoveTemp(InFunction)))))
- {
- ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueueAsyncCalls);
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Async(%p \"%s\") [%" PRIi32 "]: Executing async function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- InFunction();
- return;
- }
- StartWorkerIfNeeded();
- }
- void FWwiseExecutionQueue::AsyncAlways(const TCHAR* InDebugName, FBasicFunction&& InFunction)
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Enqueuing async always function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- Async(InDebugName, [this, CallerThreadId = FPlatformTLS::GetCurrentThreadId(), InFunction = MoveTemp(InFunction)]() mutable
- {
- if (CallerThreadId == FPlatformTLS::GetCurrentThreadId())
- {
- LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::AsyncAlways"), [this, InFunction = MoveTemp(InFunction)]
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Executing function %p in TaskGraph"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- InFunction();
- });
- }
- else
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncAlways(%p \"%s\") [%" PRIi32 "]: Executing function %p in worker thread"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- InFunction();
- }
- });
- }
- void FWwiseExecutionQueue::AsyncWait(const TCHAR* InDebugName, FBasicFunction&& InFunction)
- {
- SCOPED_WWISECONCURRENCY_EVENT_4(TEXT("FWwiseExecutionQueue::AsyncWait"));
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Enqueuing async wait function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- FEventRef Event(EEventMode::ManualReset);
- if (UNLIKELY(IsBeingClosed() || !OpQueue.Enqueue(FOpQueueItem(InDebugName, [this, &Event, &InFunction] {
- ASYNC_INC_DWORD_STAT(STAT_WwiseExecutionQueueAsyncWaitCalls);
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Executing async wait function %p"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- InFunction();
- Event->Trigger();
- }))))
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::AsyncWait(%p \"%s\") [%" PRIi32 "]: Executing async wait function %p synchronously!"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (intptr_t&)InFunction);
- InFunction();
- return;
- }
- StartWorkerIfNeeded();
- Event->Wait();
- }
- void FWwiseExecutionQueue::Close()
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Closing"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- if (IsBeingClosed())
- {
- return;
- }
- AsyncWait(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::Close"), [this]
- {
- TrySetRunningWorkerToClosing();
- });
- auto State = WorkerState.load(std::memory_order_relaxed);
- if (State != EWorkerState::Closed)
- {
- SCOPED_WWISECONCURRENCY_EVENT_4(TEXT("FWwiseExecutionQueue::Close Waiting"));
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Waiting for Closed"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- while (State != EWorkerState::Closed)
- {
- FPlatformProcess::Yield();
- State = WorkerState.load(std::memory_order_relaxed);
- }
- }
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Done Closing"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- }
- void FWwiseExecutionQueue::CloseAndDelete()
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Close(%p \"%s\") [%" PRIi32 "]: Closing and Request Deleting"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- bDeleteOnceClosed = true;
- FEvent* CloseAndDeleteFunctionReturned = FPlatformProcess::GetSynchEventFromPool();
-
- Async(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::CloseAndDelete"), [CloseAndDeleteFunctionReturned, this]
- {
- if (CloseAndDeleteFunctionReturned)
- {
- CloseAndDeleteFunctionReturned->Wait();
- FPlatformProcess::ReturnSynchEventToPool(CloseAndDeleteFunctionReturned);
- }
- else
- {
- FPlatformProcess::Yield();
- }
- TrySetRunningWorkerToClosing();
- });
- if (CloseAndDeleteFunctionReturned)
- {
- CloseAndDeleteFunctionReturned->Trigger();
- }
- }
- bool FWwiseExecutionQueue::IsBeingClosed() const
- {
- const auto State = WorkerState.load(std::memory_order_seq_cst);
- return UNLIKELY(State == EWorkerState::Closed || State == EWorkerState::Closing);
- }
- bool FWwiseExecutionQueue::IsClosed() const
- {
- const auto State = WorkerState.load(std::memory_order_seq_cst);
- return State == EWorkerState::Closed;
- }
- bool FWwiseExecutionQueue::IsRunningInThisThread() const
- {
- return this == TLS::CurrentExecutionQueue;
- }
- void FWwiseExecutionQueue::StartWorkerIfNeeded()
- {
- if (!TrySetRunningWorkerToAddOp() && TrySetStoppedWorkerToRunning())
- {
- if (UNLIKELY(!IWwiseConcurrencyModule::GetModule() || Test::bMockEngineDeletion || Test::bMockEngineDeleted) &&
- UNLIKELY(!FTaskGraphInterface::IsRunning() || Test::bMockEngineDeleted))
- {
- UE_CLOG(!Test::bMockEngineDeleted, LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::StartWorkerIfNeeded(%p \"%s\") [%" PRIi32 "]: No Task Graph. Do tasks now"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- Work();
- }
- else
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::StartWorkerIfNeeded(%p \"%s\") [%" PRIi32 "]: Starting new worker task with priority %d"), this, DebugName, FPlatformTLS::GetCurrentThreadId(), (int)TaskPriority);
- LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::StartWorkerIfNeeded"), TaskPriority, [this]
- {
- Work();
- });
- }
- }
- }
- void FWwiseExecutionQueue::Work()
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::Work(%p \"%s\") [%" PRIi32 "]: Started worker."), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- do
- {
- ProcessWork();
- }
- while (KeepWorking());
- }
- bool FWwiseExecutionQueue::KeepWorking()
- {
- const auto CurrentThreadId = FPlatformTLS::GetCurrentThreadId();
- const auto bDeleteOnceClosedCopy = this->bDeleteOnceClosed;
- if (LIKELY(TrySetRunningWorkerToStopped()))
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Stopped worker."), this, DebugName, CurrentThreadId);
-
-
- return false;
- }
- else if (LIKELY(TrySetClosingWorkerToClosed()))
- {
-
-
- if (bDeleteOnceClosedCopy)
- {
- LaunchWwiseTask(WWISECONCURRENCY_ASYNC_NAME("FWwiseExecutionQueue::KeepWorking delete"), [this]
- {
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Auto deleting on Any Thread"), this, DebugName, FPlatformTLS::GetCurrentThreadId());
- delete this;
- });
- }
- return false;
- }
-
- else if (UNLIKELY(WorkerState.load() == EWorkerState::Closed))
- {
- UE_LOG(LogWwiseConcurrency, Error, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Worker is closed, but we are still looping."), this, DebugName, CurrentThreadId);
- return false;
- }
- else if (UNLIKELY(WorkerState.load() == EWorkerState::Stopped))
- {
- UE_LOG(LogWwiseConcurrency, Error, TEXT("FWwiseExecutionQueue::KeepWorking(%p \"%s\") [%" PRIi32 "]: Worker is stopped, but we haven't stopped it ourselves."), this, DebugName, CurrentThreadId);
- return false;
- }
-
- return true;
- }
- void FWwiseExecutionQueue::ProcessWork()
- {
- TrySetAddOpWorkerToRunning();
- auto* PreviousExecutionQueue = TLS::CurrentExecutionQueue;
- TLS::CurrentExecutionQueue = this;
-
- for (const FOpQueueItem* Op; (Op = OpQueue.Peek()) != nullptr; OpQueue.Pop())
- {
- #if ENABLE_NAMED_EVENTS
- SCOPED_NAMED_EVENT_TCHAR_CONDITIONAL(Op->DebugName, WwiseNamedEvents::Color3, Op->DebugName != nullptr);
- #endif
- Op->Function();
- }
- TLS::CurrentExecutionQueue = PreviousExecutionQueue;
- }
- bool FWwiseExecutionQueue::TrySetStoppedWorkerToRunning()
- {
- return TryStateUpdate(EWorkerState::Stopped, EWorkerState::Running);
- }
- bool FWwiseExecutionQueue::TrySetRunningWorkerToStopped()
- {
- return TryStateUpdate(EWorkerState::Running, EWorkerState::Stopped);
- }
- bool FWwiseExecutionQueue::TrySetRunningWorkerToAddOp()
- {
- return TryStateUpdate(EWorkerState::Running, EWorkerState::AddOp);
- }
- bool FWwiseExecutionQueue::TrySetAddOpWorkerToRunning()
- {
- return TryStateUpdate(EWorkerState::AddOp, EWorkerState::Running);
- }
- bool FWwiseExecutionQueue::TrySetRunningWorkerToClosing()
- {
- return TryStateUpdate(EWorkerState::Running, EWorkerState::Closing)
- || TryStateUpdate(EWorkerState::AddOp, EWorkerState::Closing)
- || TryStateUpdate(EWorkerState::Stopped, EWorkerState::Closing);
-
- }
- bool FWwiseExecutionQueue::TrySetClosingWorkerToClosed()
- {
- return TryStateUpdate(EWorkerState::Closing, EWorkerState::Closed);
-
- }
- const TCHAR* FWwiseExecutionQueue::StateName(EWorkerState State)
- {
- switch (State)
- {
- case EWorkerState::Stopped: return TEXT("Stopped");
- case EWorkerState::Running: return TEXT("Running");
- case EWorkerState::AddOp: return TEXT("AddOp");
- case EWorkerState::Closing: return TEXT("Closing");
- case EWorkerState::Closed: return TEXT("Closed");
- default: return TEXT("UNKNOWN");
- }
- }
- bool FWwiseExecutionQueue::TryStateUpdate(EWorkerState NeededState, EWorkerState WantedState)
- {
- EWorkerState PreviousState = NeededState;
- bool bResult = WorkerState.compare_exchange_strong(PreviousState, WantedState);
- bResult = bResult && PreviousState == NeededState;
- UE_CLOG(Test::IsExtremelyVerbose(), LogWwiseConcurrency, VeryVerbose, TEXT("FWwiseExecutionQueue(%p \"%s\") [%" PRIi32 "]: %s %s [%s -> %s] %s %s"), this, DebugName, FPlatformTLS::GetCurrentThreadId(),
- StateName(PreviousState),
- bResult ? TEXT("==>") : TEXT("XX>"),
- StateName(NeededState), StateName(WantedState),
- bResult ? TEXT("==>") : TEXT("XX>"),
- bResult ? StateName(WantedState) : StateName(PreviousState));
- if (UNLIKELY(Test::bMockSleepOnStateUpdate) && bResult)
- {
- FPlatformProcess::Sleep(0.0001f);
- }
- return bResult;
- }
|