static bool doWorkUsingIOCompletionPorts()

in IORequestGenerator/IORequestGenerator.cpp [832:922]


static bool doWorkUsingIOCompletionPorts(ThreadParameters *p, HANDLE hCompletionPort)
{
    assert(nullptr!= p);
    assert(nullptr != hCompletionPort);

    bool fOk = true;
    BOOL rslt = FALSE;
    OVERLAPPED * pCompletedOvrp;
    ULONG_PTR ulCompletionKey;
    DWORD dwBytesTransferred;
    OverlappedQueue overlappedQueue;
    size_t cIORequests = p->vIORequest.size();

    //start IO operations
    for (size_t i = 0; i < cIORequests; i++)
    {
        overlappedQueue.Add(p->vIORequest[i].GetOverlapped());
    }

    //
    // perform work
    //
    while(g_bRun && !g_bThreadError)
    {
        DWORD dwMinSleepTime = ~((DWORD)0);
        for (size_t i = 0; i < overlappedQueue.GetCount(); i++)
        {
            OVERLAPPED *pReadyOverlapped = overlappedQueue.Remove();
            IORequest *pIORequest = IORequest::OverlappedToIORequest(pReadyOverlapped);
            Target *pTarget = pIORequest->GetNextTarget();

            if (p->vThroughputMeters.size() != 0)
            {
                size_t iTarget = pTarget - &p->vTargets[0];
                ThroughputMeter *pThroughputMeter = &p->vThroughputMeters[iTarget];

                DWORD dwSleepTime = pThroughputMeter->GetSleepTime();
                if (pThroughputMeter->IsRunning() && dwSleepTime > 0)
                {
                    dwMinSleepTime = min(dwMinSleepTime, dwSleepTime);
                    overlappedQueue.Add(pReadyOverlapped);
                    continue;
                }
            }

            rslt = issueNextIO(p, pIORequest, &dwBytesTransferred, false);

            if (!rslt && GetLastError() != ERROR_IO_PENDING)
            {
                UINT32 iIORequest = (UINT32)(pIORequest - &p->vIORequest[0]);
                PrintError("t[%u] error during %s error code: %u)\n", iIORequest, (pIORequest->GetIoType()== IOOperation::ReadIO ? "read" : "write"), GetLastError());
                fOk = false;
                goto cleanup;
            }

            if (rslt && pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
            {
                completeIO(p, pIORequest, dwBytesTransferred);
                overlappedQueue.Add(pReadyOverlapped);
            }
        }

        // if no IOs are in flight, wait for the next scheduling time
        if ((overlappedQueue.GetCount() == p->vIORequest.size()) && dwMinSleepTime != ~((DWORD)0))
        {
            Sleep(dwMinSleepTime);
        }

        // wait till one of the IO operations finishes
        if (GetQueuedCompletionStatus(hCompletionPort, &dwBytesTransferred, &ulCompletionKey, &pCompletedOvrp, 1) != 0)
        {
            //find which I/O operation it was (so we know to which buffer should we use)
            IORequest *pIORequest = IORequest::OverlappedToIORequest(pCompletedOvrp);
            completeIO(p, pIORequest, dwBytesTransferred);
            overlappedQueue.Add(pCompletedOvrp);
        }
        else
        {
            DWORD err = GetLastError();
            if (err != WAIT_TIMEOUT)
            {
                PrintError("error during overlapped IO operation (error code: %u)\n", err);
                fOk = false;
                goto cleanup;
            }
        }
    } // end work loop

cleanup:
    return fOk;
}