in IORequestGenerator/IORequestGenerator.cpp [832:922]
static bool doWorkUsingIOCompletionPorts(ThreadParameters *p, HANDLE hCompletionPort)
{
assert(nullptr!= p);
assert(nullptr != hCompletionPort);
bool fOk = true;
BOOL rslt = FALSE;
OVERLAPPED * pCompletedOvrp;
ULONG_PTR ulCompletionKey;
DWORD dwBytesTransferred;
OverlappedQueue overlappedQueue;
size_t cIORequests = p->vIORequest.size();
//start IO operations
for (size_t i = 0; i < cIORequests; i++)
{
overlappedQueue.Add(p->vIORequest[i].GetOverlapped());
}
//
// perform work
//
while(g_bRun && !g_bThreadError)
{
DWORD dwMinSleepTime = ~((DWORD)0);
for (size_t i = 0; i < overlappedQueue.GetCount(); i++)
{
OVERLAPPED *pReadyOverlapped = overlappedQueue.Remove();
IORequest *pIORequest = IORequest::OverlappedToIORequest(pReadyOverlapped);
Target *pTarget = pIORequest->GetNextTarget();
if (p->vThroughputMeters.size() != 0)
{
size_t iTarget = pTarget - &p->vTargets[0];
ThroughputMeter *pThroughputMeter = &p->vThroughputMeters[iTarget];
DWORD dwSleepTime = pThroughputMeter->GetSleepTime();
if (pThroughputMeter->IsRunning() && dwSleepTime > 0)
{
dwMinSleepTime = min(dwMinSleepTime, dwSleepTime);
overlappedQueue.Add(pReadyOverlapped);
continue;
}
}
rslt = issueNextIO(p, pIORequest, &dwBytesTransferred, false);
if (!rslt && GetLastError() != ERROR_IO_PENDING)
{
UINT32 iIORequest = (UINT32)(pIORequest - &p->vIORequest[0]);
PrintError("t[%u] error during %s error code: %u)\n", iIORequest, (pIORequest->GetIoType()== IOOperation::ReadIO ? "read" : "write"), GetLastError());
fOk = false;
goto cleanup;
}
if (rslt && pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
{
completeIO(p, pIORequest, dwBytesTransferred);
overlappedQueue.Add(pReadyOverlapped);
}
}
// if no IOs are in flight, wait for the next scheduling time
if ((overlappedQueue.GetCount() == p->vIORequest.size()) && dwMinSleepTime != ~((DWORD)0))
{
Sleep(dwMinSleepTime);
}
// wait till one of the IO operations finishes
if (GetQueuedCompletionStatus(hCompletionPort, &dwBytesTransferred, &ulCompletionKey, &pCompletedOvrp, 1) != 0)
{
//find which I/O operation it was (so we know to which buffer should we use)
IORequest *pIORequest = IORequest::OverlappedToIORequest(pCompletedOvrp);
completeIO(p, pIORequest, dwBytesTransferred);
overlappedQueue.Add(pCompletedOvrp);
}
else
{
DWORD err = GetLastError();
if (err != WAIT_TIMEOUT)
{
PrintError("error during overlapped IO operation (error code: %u)\n", err);
fOk = false;
goto cleanup;
}
}
} // end work loop
cleanup:
return fOk;
}