in Source/Task/TaskQueue.cpp [601:662]
bool __stdcall TaskQueuePortImpl::Wait(
_In_ ITaskQueuePortContext* portContext,
_In_ uint32_t timeout)
{
#ifdef _WIN32
while(true)
{
StaticArray<HANDLE, PORT_EVENT_MAX> events;
{
std::lock_guard<std::mutex> lock(m_lock);
events = m_events;
}
DWORD waitResult = WaitForMultipleObjects(events.count(), events.data(), FALSE, timeout);
if (waitResult > WAIT_OBJECT_0 && waitResult < WAIT_OBJECT_0 + events.count())
{
// One of our waiters was signaled. Find it, and then process it.
std::lock_guard<std::mutex> lock(m_lock);
for (uint32_t idx = 0; idx < m_waits.count(); idx++)
{
if (m_waits[idx]->waitHandle == events[waitResult - WAIT_OBJECT_0])
{
if (!AppendWaitRegistrationEntry(m_waits[idx]))
{
// If we fail adding to the queue, re-initialize our wait
LOG_IF_FAILED(InitializeWaitRegistration(m_waits[idx]));
}
break;
}
}
}
else if (waitResult == WAIT_OBJECT_0)
{
// We are using event 0 like a condition variable. It's
// auto reset, so if nothing is in the queue we continue
// waiting.
if (portContext->GetStatus() == TaskQueuePortStatus::Terminated || !m_queueList->empty())
{
break;
}
}
else
{
break;
}
}
#else
while (m_queueList->empty() && portContext->GetStatus() != TaskQueuePortStatus::Terminated)
{
std::unique_lock<std::mutex> lock(m_lock);
if (m_event.wait_for(lock, std::chrono::milliseconds(timeout)) == std::cv_status::timeout)
{
break;
}
}
#endif
return !m_queueList->empty() || !m_terminationList->empty();
}