bool IORequestGenerator::_GenerateRequestsForTimeSpan()

in IORequestGenerator/IORequestGenerator.cpp [2007:2616]


bool IORequestGenerator::_GenerateRequestsForTimeSpan(const Profile& profile, const TimeSpan& timeSpan, Results& results, struct Synchronization *pSynch)
{
    //FUTURE EXTENSION: add new I/O capabilities presented in Longhorn
    //FUTURE EXTENSION: add a check if the folder is compressed (cache is always enabled in case of compressed folders)

    //check if I/O request generator is already running
    LONG lGenState = InterlockedExchange(&g_lGeneratorRunning, 1);
    if (1 == lGenState)
    {
        PrintError("FATAL ERROR: I/O Request Generator already running\n");
        return false;
    }

    //initialize all global parameters (in case of second run, after the first one is finished)
    _InitializeGlobalParameters();

    HANDLE hStartEvent = nullptr;                       // start event (used to inform the worker threads that they should start the work)
    HANDLE hEndEvent = nullptr;                         // end event (used only in case of completin routines (not for IO Completion Ports))

    memset(&g_EtwEventCounters, 0, sizeof(struct ETWEventCounters));  // reset all etw event counters

    bool fUseETW = profile.GetEtwEnabled();            //true if user wants ETW

    //
    // load dlls
    //
    assert(nullptr == _hNTDLL);
    if (!_LoadDLLs())
    {
        PrintError("Error loading NtQuerySystemInformation\n");
        return false;
    }

    //FUTURE EXTENSION: check for conflicts in alignment (when cache is turned off only sector aligned I/O are permitted)
    //FUTURE EXTENSION: check if file sizes are enough to have at least first requests not wrapping around

    Random r;
    vector<Target> vTargets = timeSpan.GetTargets();
    // allocate memory for random data write buffers
    for (auto i = vTargets.begin(); i != vTargets.end(); i++)
    {
        if ((i->GetRandomDataWriteBufferSize() > 0) && !i->AllocateAndFillRandomDataWriteBuffer(&r))
        {
            return false;
        }
    }

    // check if user wanted to create a file
    for (auto i = vTargets.begin(); i != vTargets.end(); i++)
    {
        if ((i->GetFileSize() > 0) && (i->GetPrecreated() == false))
        {
            string str = i->GetPath();
            if (str.empty())
            {
                PrintError("You have to provide a filename\n");
                return false;
            }

            //skip physical drives and partitions
            if ('#' == str[0] || (':' == str[1] && '\0' == str[2]))
            {
                continue;
            }

            //create only regular files
            if (!_CreateFile(i->GetFileSize(), str.c_str(), i->GetZeroWriteBuffers(), profile.GetVerbose()))
            {
                return false;
            }
        }
    }

    // get thread count
    UINT32 cThreads = timeSpan.GetThreadCount();
    if (cThreads < 1)
    {
        for (auto i = vTargets.begin(); i != vTargets.end(); i++)
        {
            cThreads += i->GetThreadsPerFile();
        }
    }

    // allocate memory for thread handles
    vector<HANDLE> vhThreads(cThreads);

    //
    // allocate memory for performance counters
    //
    vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfInit(g_SystemInformation.processorTopology._ulProcCount);
    vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfDone(g_SystemInformation.processorTopology._ulProcCount);
    vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfDiff(g_SystemInformation.processorTopology._ulProcCount);

    //
    //create start event
    //

    hStartEvent = CreateEvent(NULL, TRUE, FALSE, "");
    if (NULL == hStartEvent)
    {
        PrintError("Error creating the start event\n");
        return false;
    }

    //
    // create end event
    //
    if (timeSpan.GetCompletionRoutines())
    {
        hEndEvent = CreateEvent(NULL, TRUE, FALSE, "");
        if (NULL == hEndEvent)
        {
            PrintError("Error creating the end event\n");
            return false;
        }
    }

    //
    // set to high priority to ensure the controller thread gets to run immediately
    // when signalled.
    //

    SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);

    //
    // create the threads
    //

    g_bRun = TRUE;

    // gather affinity information, and move to the first active processor
    const auto& vAffinity = timeSpan.GetAffinityAssignments();
    WORD wGroupCtr = 0;
    BYTE bProcCtr = 0;
    g_SystemInformation.processorTopology.GetActiveGroupProcessor(wGroupCtr, bProcCtr, false);

    volatile bool fAccountingOn = false;
    UINT64 ullStartTime;    //start time
    UINT64 ullTimeDiff;  //elapsed test time (in units returned by QueryPerformanceCounter)
    vector<UINT64> vullSharedSequentialOffsets(vTargets.size(), 0);

    results.vThreadResults.clear();
    results.vThreadResults.resize(cThreads);
    for (UINT32 iThread = 0; iThread < cThreads; ++iThread)
    {
        PrintVerbose(profile.GetVerbose(), "creating thread %u\n", iThread);
        ThreadParameters *cookie = new ThreadParameters();  // threadFunc is going to free the memory
        if (nullptr == cookie)
        {
            PrintError("FATAL ERROR: could not allocate memory\n");
            _AbortWorkerThreads(hStartEvent, vhThreads);
            return false;
        }

        // each thread has a different random seed
        Random *pRand = new Random(timeSpan.GetRandSeed() + iThread);
        if (nullptr == pRand)
        {
            PrintError("FATAL ERROR: could not allocate memory\n");
            _AbortWorkerThreads(hStartEvent, vhThreads);
            delete cookie;
            return false;
        }

        UINT32 ulRelativeThreadNo = 0;

        if (timeSpan.GetThreadCount() > 0)
        {
            // fixed thread mode: threads operate on specified files
            // and receive the entire seq index array.
            // relative thread number is the same as thread number.
            cookie->pullSharedSequentialOffsets = &vullSharedSequentialOffsets[0];
            ulRelativeThreadNo = iThread;
            for (auto i = vTargets.begin();
                 i != vTargets.end();
                 i++)
            {
                const vector<ThreadTarget> vThreadTargets = i->GetThreadTargets();

                // no thread targets specified - add to all threads
                if (vThreadTargets.size() == 0)
                {
                    cookie->vTargets.push_back(*i);
                }
                else
                {
                    // check if the target should be added to the current thread
                    for (UINT32 iThreadTarget = 0; iThreadTarget < vThreadTargets.size(); iThreadTarget++)
                    {
                        if (vThreadTargets[iThreadTarget].GetThread() == iThread)
                        {
                            // confirm copy constructor?
                            cookie->vTargets.push_back(*i);
                            break;
                        }
                    }
                }
            }
        }
        else
        {
            size_t cAssignedThreads = 0;
            size_t cBaseThread = 0;
            auto psi = vullSharedSequentialOffsets.begin();
            for (auto i = vTargets.begin();
                 i != vTargets.end();
                 i++, psi++)
            {
                // per-file thread mode: groups of threads operate on individual files
                // and receive the specific seq index for their file (note: singular).
                // loop up through the targets to assign thread n to the appropriate file.
                // relative thread number is file-relative, so keep track of the base
                // thread number for the file and calculate relative to that.
                //
                // ex: two files, two threads per file
                //  t0: rt0 for f0 (cAssigned = 2, cBase = 0)
                //  t1: rt1 for f0 (cAssigned = 2, cBase = 0)
                //  t2: rt0 for f1 (cAssigned = 4, cBase = 2)
                //  t3: rt1 for f1 (cAssigned = 4, cBase = 2)

                cAssignedThreads += i->GetThreadsPerFile();
                if (iThread < cAssignedThreads)
                {
                    // confirm copy constructor?
                    cookie->vTargets.push_back(*i);
                    cookie->pullSharedSequentialOffsets = &(*psi);
                    ulRelativeThreadNo = (iThread - cBaseThread) % i->GetThreadsPerFile();

                    PrintVerbose(profile.GetVerbose(), "thread %u is relative thread %u for %s\n", iThread, ulRelativeThreadNo, i->GetPath().c_str());
                    break;
                }
                cBaseThread += i->GetThreadsPerFile();
            }
        }

        cookie->pProfile = &profile;
        cookie->pTimeSpan = &timeSpan;
        cookie->hStartEvent = hStartEvent;
        cookie->hEndEvent = hEndEvent;
        cookie->ulThreadNo = iThread;
        cookie->ulRelativeThreadNo = ulRelativeThreadNo;
        cookie->pfAccountingOn = &fAccountingOn;
        cookie->pullStartTime = &ullStartTime;
        cookie->ulRandSeed = timeSpan.GetRandSeed() + iThread;  // each thread has a different random seed
        cookie->pRand = pRand;

        //Set thread group and proc affinity

        // Default: Round robin cores in order of groups, starting at group 0.
        //          Fill each group before moving to next.
        if (vAffinity.size() == 0)
        {
            cookie->wGroupNum = wGroupCtr;
            cookie->bProcNum = bProcCtr;

            // advance to next active
            g_SystemInformation.processorTopology.GetActiveGroupProcessor(wGroupCtr, bProcCtr, true);
        }
        // Assigned affinity. Round robin through the assignment list.
        else
        {
            ULONG i = iThread % vAffinity.size();

            cookie->wGroupNum = vAffinity[i].wGroup;
            cookie->bProcNum = vAffinity[i].bProc;
        }

        //create thread
        cookie->pResults = &results.vThreadResults[iThread];

        InterlockedIncrement(&g_lRunningThreadsCount);
        DWORD dwThreadId;
        HANDLE hThread = CreateThread(NULL, 64 * 1024, threadFunc, cookie, 0, &dwThreadId);
        if (NULL == hThread)
        {
            //in case of error terminate running worker threads
            PrintError("ERROR: unable to create thread (error code: %u)\n", GetLastError());
            InterlockedDecrement(&g_lRunningThreadsCount);
            _AbortWorkerThreads(hStartEvent, vhThreads);
            delete pRand;
            delete cookie;
            return false;
        }

        //store handle to the thread
        vhThreads[iThread] = hThread;
    }

    if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, hStartEvent) && (NULL != pSynch->hStartEvent))
    {
        if (WAIT_OBJECT_0 != WaitForSingleObject(pSynch->hStartEvent, INFINITE))
        {
            PrintError("Error during WaitForSingleObject\n");
            _AbortWorkerThreads(hStartEvent, vhThreads);
            return false;
        }
    }

    //
    // get cycle count (it will be used to calculate actual work time)
    //
    DWORD dwWaitStatus = 0;

    //bAccountingOn = FALSE; // clear the accouning flag so that threads didn't count what they do while in the warmup phase

    BOOL bSynchStop = STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, hStopEvent) && (NULL != pSynch->hStopEvent);
    BOOL bBreak = FALSE;
    PEVENT_TRACE_PROPERTIES pETWSession = NULL;

    PrintVerbose(profile.GetVerbose(), "starting warm up...\n");
    //
    // send start signal
    //
    if (!SetEvent(hStartEvent))
    {
        PrintError("Error signaling start event\n");
        //        stopETW(bUseETW, hTraceSession);
        _TerminateWorkerThreads(vhThreads);    //FUTURE EXTENSION: timeout for worker threads
        return false;
    }

    //
    // wait specified amount of time in each phase (warm up, test, cool down)
    //
    if (timeSpan.GetWarmup() > 0)
    {
        TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> WarmActivity;
        TraceLoggingWriteStart(WarmActivity, "Warm Up");

        if (bSynchStop)
        {
            assert(NULL != pSynch->hStopEvent);
            dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetWarmup());
            if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
            {
                PrintError("Error during WaitForSingleObject\n");
                _TerminateWorkerThreads(vhThreads);
                return false;
            }
            bBreak = (WAIT_TIMEOUT != dwWaitStatus);
        }
        else
        {
            Sleep(1000 * timeSpan.GetWarmup());
        }

        TraceLoggingWriteStop(WarmActivity, "Warm Up");
    }

    if (!bBreak) // proceed only if user didn't break the test
    {
        //FUTURE EXTENSION: starting ETW session shouldn't be done brutally here, should be done before warmup and here just a fast signal to start logging (see also stopping ETW session)
        //FUTURE EXTENSION: put an ETW mark here, for easier parsing by external tools

        //
        // start etw session
        //
        TRACEHANDLE hTraceSession = NULL;
        if (fUseETW)
        {
            PrintVerbose(profile.GetVerbose(), "starting trace session\n");
            hTraceSession = StartETWSession(profile);
            if (NULL == hTraceSession)
            {
                PrintError("Could not start ETW session\n");
                _TerminateWorkerThreads(vhThreads);
                return false;
            }

            if (NULL == CreateThread(NULL, 64 * 1024, etwThreadFunc, NULL, 0, NULL))
            {
                PrintError("Warning: unable to create thread for ETW session\n");
                _TerminateWorkerThreads(vhThreads);
                return false;
            }
            PrintVerbose(profile.GetVerbose(), "tracing events\n");
        }

        PrintVerbose(profile.GetVerbose(), "starting measurements...\n");

        //
        // notify the front-end that the test is about to start;
        // do it before starting timing in order not to perturb measurements
        //
        if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, pfnCallbackTestStarted) && (NULL != pSynch->pfnCallbackTestStarted))
        {
            pSynch->pfnCallbackTestStarted();
        }

        //
        // read performance counters
        //
        if (_GetSystemPerfInfo(&vPerfInit[0], g_SystemInformation.processorTopology._ulProcCount) == FALSE)
        {
            PrintError("Error reading performance counters\n");
            _StopETW(fUseETW, hTraceSession);
            _TerminateWorkerThreads(vhThreads);
            return false;
        }

        TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> RunActivity;
        TraceLoggingWriteStart(RunActivity, "Run Time");

        //get cycle count (it will be used to calculate actual work time)
        ullStartTime = PerfTimer::GetTime();

#pragma warning( push )
#pragma warning( disable : 28931 )
        fAccountingOn = true;
#pragma warning( pop )

        assert(timeSpan.GetDuration() > 0);
        if (bSynchStop)
        {
            assert(NULL != pSynch->hStopEvent);
            dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetDuration());
            if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
            {
                PrintError("Error during WaitForSingleObject\n");
                _StopETW(fUseETW, hTraceSession);
                _TerminateWorkerThreads(vhThreads);    //FUTURE EXTENSION: worker threads should have a chance to free allocated memory (see also other places calling terminateWorkerThreads())
                return FALSE;
            }
            bBreak = (WAIT_TIMEOUT != dwWaitStatus);
        }
        else
        {
            Sleep(1000 * timeSpan.GetDuration());
        }

        fAccountingOn = false;

        //get cycle count and perf counters
        ullTimeDiff = PerfTimer::GetTime() - ullStartTime;

        TraceLoggingWriteStop(RunActivity, "Run Time");

        if (_GetSystemPerfInfo(&vPerfDone[0], g_SystemInformation.processorTopology._ulProcCount) == FALSE)
        {
            PrintError("Error getting performance counters\n");
            _StopETW(fUseETW, hTraceSession);
            _TerminateWorkerThreads(vhThreads);
            return false;
        }

        //
        // notify the front-end that the test has just finished;
        // do it after stopping timing in order not to perturb measurements
        //
        if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, pfnCallbackTestFinished) && (NULL != pSynch->pfnCallbackTestFinished))
        {
            pSynch->pfnCallbackTestFinished();
        }

        //
        // stop etw session
        //
        if (fUseETW)
        {
            PrintVerbose(profile.GetVerbose(), "stopping ETW session\n");
            pETWSession = StopETWSession(hTraceSession);
            if (NULL == pETWSession)
            {
                PrintError("Error stopping ETW session\n");
                return false;
            }
        }
    }
    else
    {
        ullTimeDiff = 0; // mark that no test was run
    }

    PrintVerbose(profile.GetVerbose(), "starting cool down...\n");
    if ((timeSpan.GetCooldown() > 0) && !bBreak)
    {
        TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> CoolActivity;
        TraceLoggingWriteStart(CoolActivity, "Cool Down");

        if (bSynchStop)
        {
            assert(NULL != pSynch->hStopEvent);
            dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetCooldown());
            if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
            {
                PrintError("Error during WaitForSingleObject\n");
                //                stopETW(bUseETW, hTraceSession);
                _TerminateWorkerThreads(vhThreads);
                return false;
            }
        }
        else
        {
            Sleep(1000 * timeSpan.GetCooldown());
        }

        TraceLoggingWriteStop(CoolActivity, "Cool Down");
    }
    PrintVerbose(profile.GetVerbose(), "finished test...\n");

    //
    // signal the threads to finish
    //
    g_bRun = FALSE;
    if (timeSpan.GetCompletionRoutines())
    {
        if (!SetEvent(hEndEvent))
        {
            PrintError("Error signaling end event\n");
            //            stopETW(bUseETW, hTraceSession);
            return false;
        }
    }

    //
    // wait till all of the threads finish
    //
#pragma warning( push )
#pragma warning( disable : 28112 )
    while (g_lRunningThreadsCount > 0)
    {
        Sleep(10);    //FUTURE EXTENSION: a timeout should be implemented
    }
#pragma warning( pop )


    //check if there has been an error during threads execution
    if (g_bThreadError)
    {
        PrintError("There has been an error during threads execution\n");
        return false;
    }

    //
    // close events' handles
    //
    CloseHandle(hStartEvent);
    hStartEvent = NULL;

    if (NULL != hEndEvent)
    {
        CloseHandle(hEndEvent);
        hEndEvent = NULL;
    }
    //FUTURE EXTENSION: hStartEvent and hEndEvent should be closed in case of error too

    //
    // compute time spent by each cpu
    //
    for (unsigned int p = 0; p < g_SystemInformation.processorTopology._ulProcCount; ++p)
    {
        assert(vPerfDone[p].IdleTime.QuadPart >= vPerfInit[p].IdleTime.QuadPart);
        assert(vPerfDone[p].KernelTime.QuadPart >= vPerfInit[p].KernelTime.QuadPart);
        assert(vPerfDone[p].UserTime.QuadPart >= vPerfInit[p].UserTime.QuadPart);
        assert(vPerfDone[p].Reserved1[0].QuadPart >= vPerfInit[p].Reserved1[0].QuadPart);
        assert(vPerfDone[p].Reserved1[1].QuadPart >= vPerfInit[p].Reserved1[1].QuadPart);
        assert(vPerfDone[p].Reserved2 >= vPerfInit[p].Reserved2);

        vPerfDiff[p].IdleTime.QuadPart = vPerfDone[p].IdleTime.QuadPart - vPerfInit[p].IdleTime.QuadPart;
        vPerfDiff[p].KernelTime.QuadPart = vPerfDone[p].KernelTime.QuadPart - vPerfInit[p].KernelTime.QuadPart;
        vPerfDiff[p].UserTime.QuadPart = vPerfDone[p].UserTime.QuadPart - vPerfInit[p].UserTime.QuadPart;
        vPerfDiff[p].Reserved1[0].QuadPart = vPerfDone[p].Reserved1[0].QuadPart - vPerfInit[p].Reserved1[0].QuadPart;
        vPerfDiff[p].Reserved1[1].QuadPart = vPerfDone[p].Reserved1[1].QuadPart - vPerfInit[p].Reserved1[1].QuadPart;
        vPerfDiff[p].Reserved2 = vPerfDone[p].Reserved2 - vPerfInit[p].Reserved2;
    }

    //
    // process results and pass them to the result parser
    //

    // get processors perf. info
    results.vSystemProcessorPerfInfo = vPerfDiff;
    results.ullTimeCount = ullTimeDiff;

    //
    // create structure containing etw results and properties
    //
    results.fUseETW = fUseETW;
    if (fUseETW)
    {
        results.EtwEventCounters = g_EtwEventCounters;
        results.EtwSessionInfo = _GetResultETWSession(pETWSession);

        // TODO: refactor to a separate function
        results.EtwMask.bProcess = profile.GetEtwProcess();
        results.EtwMask.bThread = profile.GetEtwThread();
        results.EtwMask.bImageLoad = profile.GetEtwImageLoad();
        results.EtwMask.bDiskIO = profile.GetEtwDiskIO();
        results.EtwMask.bMemoryPageFaults = profile.GetEtwMemoryPageFaults();
        results.EtwMask.bMemoryHardFaults = profile.GetEtwMemoryHardFaults();
        results.EtwMask.bNetwork = profile.GetEtwNetwork();
        results.EtwMask.bRegistry = profile.GetEtwRegistry();
        results.EtwMask.bUsePagedMemory = profile.GetEtwUsePagedMemory();
        results.EtwMask.bUsePerfTimer = profile.GetEtwUsePerfTimer();
        results.EtwMask.bUseSystemTimer = profile.GetEtwUseSystemTimer();
        results.EtwMask.bUseCyclesCounter = profile.GetEtwUseCyclesCounter();

        free(pETWSession);
    }

    // free memory used by random data write buffers
    for (auto i = vTargets.begin(); i != vTargets.end(); i++)
    {
        i->FreeRandomDataWriteBuffer();
    }

    // TODO: this won't catch error cases, which exit early
    InterlockedExchange(&g_lGeneratorRunning, 0);
    return true;
}