in IORequestGenerator/IORequestGenerator.cpp [2007:2616]
bool IORequestGenerator::_GenerateRequestsForTimeSpan(const Profile& profile, const TimeSpan& timeSpan, Results& results, struct Synchronization *pSynch)
{
//FUTURE EXTENSION: add new I/O capabilities presented in Longhorn
//FUTURE EXTENSION: add a check if the folder is compressed (cache is always enabled in case of compressed folders)
//check if I/O request generator is already running
LONG lGenState = InterlockedExchange(&g_lGeneratorRunning, 1);
if (1 == lGenState)
{
PrintError("FATAL ERROR: I/O Request Generator already running\n");
return false;
}
//initialize all global parameters (in case of second run, after the first one is finished)
_InitializeGlobalParameters();
HANDLE hStartEvent = nullptr; // start event (used to inform the worker threads that they should start the work)
HANDLE hEndEvent = nullptr; // end event (used only in case of completin routines (not for IO Completion Ports))
memset(&g_EtwEventCounters, 0, sizeof(struct ETWEventCounters)); // reset all etw event counters
bool fUseETW = profile.GetEtwEnabled(); //true if user wants ETW
//
// load dlls
//
assert(nullptr == _hNTDLL);
if (!_LoadDLLs())
{
PrintError("Error loading NtQuerySystemInformation\n");
return false;
}
//FUTURE EXTENSION: check for conflicts in alignment (when cache is turned off only sector aligned I/O are permitted)
//FUTURE EXTENSION: check if file sizes are enough to have at least first requests not wrapping around
Random r;
vector<Target> vTargets = timeSpan.GetTargets();
// allocate memory for random data write buffers
for (auto i = vTargets.begin(); i != vTargets.end(); i++)
{
if ((i->GetRandomDataWriteBufferSize() > 0) && !i->AllocateAndFillRandomDataWriteBuffer(&r))
{
return false;
}
}
// check if user wanted to create a file
for (auto i = vTargets.begin(); i != vTargets.end(); i++)
{
if ((i->GetFileSize() > 0) && (i->GetPrecreated() == false))
{
string str = i->GetPath();
if (str.empty())
{
PrintError("You have to provide a filename\n");
return false;
}
//skip physical drives and partitions
if ('#' == str[0] || (':' == str[1] && '\0' == str[2]))
{
continue;
}
//create only regular files
if (!_CreateFile(i->GetFileSize(), str.c_str(), i->GetZeroWriteBuffers(), profile.GetVerbose()))
{
return false;
}
}
}
// get thread count
UINT32 cThreads = timeSpan.GetThreadCount();
if (cThreads < 1)
{
for (auto i = vTargets.begin(); i != vTargets.end(); i++)
{
cThreads += i->GetThreadsPerFile();
}
}
// allocate memory for thread handles
vector<HANDLE> vhThreads(cThreads);
//
// allocate memory for performance counters
//
vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfInit(g_SystemInformation.processorTopology._ulProcCount);
vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfDone(g_SystemInformation.processorTopology._ulProcCount);
vector<SYSTEM_PROCESSOR_PERFORMANCE_INFORMATION> vPerfDiff(g_SystemInformation.processorTopology._ulProcCount);
//
//create start event
//
hStartEvent = CreateEvent(NULL, TRUE, FALSE, "");
if (NULL == hStartEvent)
{
PrintError("Error creating the start event\n");
return false;
}
//
// create end event
//
if (timeSpan.GetCompletionRoutines())
{
hEndEvent = CreateEvent(NULL, TRUE, FALSE, "");
if (NULL == hEndEvent)
{
PrintError("Error creating the end event\n");
return false;
}
}
//
// set to high priority to ensure the controller thread gets to run immediately
// when signalled.
//
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST);
//
// create the threads
//
g_bRun = TRUE;
// gather affinity information, and move to the first active processor
const auto& vAffinity = timeSpan.GetAffinityAssignments();
WORD wGroupCtr = 0;
BYTE bProcCtr = 0;
g_SystemInformation.processorTopology.GetActiveGroupProcessor(wGroupCtr, bProcCtr, false);
volatile bool fAccountingOn = false;
UINT64 ullStartTime; //start time
UINT64 ullTimeDiff; //elapsed test time (in units returned by QueryPerformanceCounter)
vector<UINT64> vullSharedSequentialOffsets(vTargets.size(), 0);
results.vThreadResults.clear();
results.vThreadResults.resize(cThreads);
for (UINT32 iThread = 0; iThread < cThreads; ++iThread)
{
PrintVerbose(profile.GetVerbose(), "creating thread %u\n", iThread);
ThreadParameters *cookie = new ThreadParameters(); // threadFunc is going to free the memory
if (nullptr == cookie)
{
PrintError("FATAL ERROR: could not allocate memory\n");
_AbortWorkerThreads(hStartEvent, vhThreads);
return false;
}
// each thread has a different random seed
Random *pRand = new Random(timeSpan.GetRandSeed() + iThread);
if (nullptr == pRand)
{
PrintError("FATAL ERROR: could not allocate memory\n");
_AbortWorkerThreads(hStartEvent, vhThreads);
delete cookie;
return false;
}
UINT32 ulRelativeThreadNo = 0;
if (timeSpan.GetThreadCount() > 0)
{
// fixed thread mode: threads operate on specified files
// and receive the entire seq index array.
// relative thread number is the same as thread number.
cookie->pullSharedSequentialOffsets = &vullSharedSequentialOffsets[0];
ulRelativeThreadNo = iThread;
for (auto i = vTargets.begin();
i != vTargets.end();
i++)
{
const vector<ThreadTarget> vThreadTargets = i->GetThreadTargets();
// no thread targets specified - add to all threads
if (vThreadTargets.size() == 0)
{
cookie->vTargets.push_back(*i);
}
else
{
// check if the target should be added to the current thread
for (UINT32 iThreadTarget = 0; iThreadTarget < vThreadTargets.size(); iThreadTarget++)
{
if (vThreadTargets[iThreadTarget].GetThread() == iThread)
{
// confirm copy constructor?
cookie->vTargets.push_back(*i);
break;
}
}
}
}
}
else
{
size_t cAssignedThreads = 0;
size_t cBaseThread = 0;
auto psi = vullSharedSequentialOffsets.begin();
for (auto i = vTargets.begin();
i != vTargets.end();
i++, psi++)
{
// per-file thread mode: groups of threads operate on individual files
// and receive the specific seq index for their file (note: singular).
// loop up through the targets to assign thread n to the appropriate file.
// relative thread number is file-relative, so keep track of the base
// thread number for the file and calculate relative to that.
//
// ex: two files, two threads per file
// t0: rt0 for f0 (cAssigned = 2, cBase = 0)
// t1: rt1 for f0 (cAssigned = 2, cBase = 0)
// t2: rt0 for f1 (cAssigned = 4, cBase = 2)
// t3: rt1 for f1 (cAssigned = 4, cBase = 2)
cAssignedThreads += i->GetThreadsPerFile();
if (iThread < cAssignedThreads)
{
// confirm copy constructor?
cookie->vTargets.push_back(*i);
cookie->pullSharedSequentialOffsets = &(*psi);
ulRelativeThreadNo = (iThread - cBaseThread) % i->GetThreadsPerFile();
PrintVerbose(profile.GetVerbose(), "thread %u is relative thread %u for %s\n", iThread, ulRelativeThreadNo, i->GetPath().c_str());
break;
}
cBaseThread += i->GetThreadsPerFile();
}
}
cookie->pProfile = &profile;
cookie->pTimeSpan = &timeSpan;
cookie->hStartEvent = hStartEvent;
cookie->hEndEvent = hEndEvent;
cookie->ulThreadNo = iThread;
cookie->ulRelativeThreadNo = ulRelativeThreadNo;
cookie->pfAccountingOn = &fAccountingOn;
cookie->pullStartTime = &ullStartTime;
cookie->ulRandSeed = timeSpan.GetRandSeed() + iThread; // each thread has a different random seed
cookie->pRand = pRand;
//Set thread group and proc affinity
// Default: Round robin cores in order of groups, starting at group 0.
// Fill each group before moving to next.
if (vAffinity.size() == 0)
{
cookie->wGroupNum = wGroupCtr;
cookie->bProcNum = bProcCtr;
// advance to next active
g_SystemInformation.processorTopology.GetActiveGroupProcessor(wGroupCtr, bProcCtr, true);
}
// Assigned affinity. Round robin through the assignment list.
else
{
ULONG i = iThread % vAffinity.size();
cookie->wGroupNum = vAffinity[i].wGroup;
cookie->bProcNum = vAffinity[i].bProc;
}
//create thread
cookie->pResults = &results.vThreadResults[iThread];
InterlockedIncrement(&g_lRunningThreadsCount);
DWORD dwThreadId;
HANDLE hThread = CreateThread(NULL, 64 * 1024, threadFunc, cookie, 0, &dwThreadId);
if (NULL == hThread)
{
//in case of error terminate running worker threads
PrintError("ERROR: unable to create thread (error code: %u)\n", GetLastError());
InterlockedDecrement(&g_lRunningThreadsCount);
_AbortWorkerThreads(hStartEvent, vhThreads);
delete pRand;
delete cookie;
return false;
}
//store handle to the thread
vhThreads[iThread] = hThread;
}
if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, hStartEvent) && (NULL != pSynch->hStartEvent))
{
if (WAIT_OBJECT_0 != WaitForSingleObject(pSynch->hStartEvent, INFINITE))
{
PrintError("Error during WaitForSingleObject\n");
_AbortWorkerThreads(hStartEvent, vhThreads);
return false;
}
}
//
// get cycle count (it will be used to calculate actual work time)
//
DWORD dwWaitStatus = 0;
//bAccountingOn = FALSE; // clear the accouning flag so that threads didn't count what they do while in the warmup phase
BOOL bSynchStop = STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, hStopEvent) && (NULL != pSynch->hStopEvent);
BOOL bBreak = FALSE;
PEVENT_TRACE_PROPERTIES pETWSession = NULL;
PrintVerbose(profile.GetVerbose(), "starting warm up...\n");
//
// send start signal
//
if (!SetEvent(hStartEvent))
{
PrintError("Error signaling start event\n");
// stopETW(bUseETW, hTraceSession);
_TerminateWorkerThreads(vhThreads); //FUTURE EXTENSION: timeout for worker threads
return false;
}
//
// wait specified amount of time in each phase (warm up, test, cool down)
//
if (timeSpan.GetWarmup() > 0)
{
TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> WarmActivity;
TraceLoggingWriteStart(WarmActivity, "Warm Up");
if (bSynchStop)
{
assert(NULL != pSynch->hStopEvent);
dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetWarmup());
if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
{
PrintError("Error during WaitForSingleObject\n");
_TerminateWorkerThreads(vhThreads);
return false;
}
bBreak = (WAIT_TIMEOUT != dwWaitStatus);
}
else
{
Sleep(1000 * timeSpan.GetWarmup());
}
TraceLoggingWriteStop(WarmActivity, "Warm Up");
}
if (!bBreak) // proceed only if user didn't break the test
{
//FUTURE EXTENSION: starting ETW session shouldn't be done brutally here, should be done before warmup and here just a fast signal to start logging (see also stopping ETW session)
//FUTURE EXTENSION: put an ETW mark here, for easier parsing by external tools
//
// start etw session
//
TRACEHANDLE hTraceSession = NULL;
if (fUseETW)
{
PrintVerbose(profile.GetVerbose(), "starting trace session\n");
hTraceSession = StartETWSession(profile);
if (NULL == hTraceSession)
{
PrintError("Could not start ETW session\n");
_TerminateWorkerThreads(vhThreads);
return false;
}
if (NULL == CreateThread(NULL, 64 * 1024, etwThreadFunc, NULL, 0, NULL))
{
PrintError("Warning: unable to create thread for ETW session\n");
_TerminateWorkerThreads(vhThreads);
return false;
}
PrintVerbose(profile.GetVerbose(), "tracing events\n");
}
PrintVerbose(profile.GetVerbose(), "starting measurements...\n");
//
// notify the front-end that the test is about to start;
// do it before starting timing in order not to perturb measurements
//
if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, pfnCallbackTestStarted) && (NULL != pSynch->pfnCallbackTestStarted))
{
pSynch->pfnCallbackTestStarted();
}
//
// read performance counters
//
if (_GetSystemPerfInfo(&vPerfInit[0], g_SystemInformation.processorTopology._ulProcCount) == FALSE)
{
PrintError("Error reading performance counters\n");
_StopETW(fUseETW, hTraceSession);
_TerminateWorkerThreads(vhThreads);
return false;
}
TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> RunActivity;
TraceLoggingWriteStart(RunActivity, "Run Time");
//get cycle count (it will be used to calculate actual work time)
ullStartTime = PerfTimer::GetTime();
#pragma warning( push )
#pragma warning( disable : 28931 )
fAccountingOn = true;
#pragma warning( pop )
assert(timeSpan.GetDuration() > 0);
if (bSynchStop)
{
assert(NULL != pSynch->hStopEvent);
dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetDuration());
if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
{
PrintError("Error during WaitForSingleObject\n");
_StopETW(fUseETW, hTraceSession);
_TerminateWorkerThreads(vhThreads); //FUTURE EXTENSION: worker threads should have a chance to free allocated memory (see also other places calling terminateWorkerThreads())
return FALSE;
}
bBreak = (WAIT_TIMEOUT != dwWaitStatus);
}
else
{
Sleep(1000 * timeSpan.GetDuration());
}
fAccountingOn = false;
//get cycle count and perf counters
ullTimeDiff = PerfTimer::GetTime() - ullStartTime;
TraceLoggingWriteStop(RunActivity, "Run Time");
if (_GetSystemPerfInfo(&vPerfDone[0], g_SystemInformation.processorTopology._ulProcCount) == FALSE)
{
PrintError("Error getting performance counters\n");
_StopETW(fUseETW, hTraceSession);
_TerminateWorkerThreads(vhThreads);
return false;
}
//
// notify the front-end that the test has just finished;
// do it after stopping timing in order not to perturb measurements
//
if (STRUCT_SYNCHRONIZATION_SUPPORTS(pSynch, pfnCallbackTestFinished) && (NULL != pSynch->pfnCallbackTestFinished))
{
pSynch->pfnCallbackTestFinished();
}
//
// stop etw session
//
if (fUseETW)
{
PrintVerbose(profile.GetVerbose(), "stopping ETW session\n");
pETWSession = StopETWSession(hTraceSession);
if (NULL == pETWSession)
{
PrintError("Error stopping ETW session\n");
return false;
}
}
}
else
{
ullTimeDiff = 0; // mark that no test was run
}
PrintVerbose(profile.GetVerbose(), "starting cool down...\n");
if ((timeSpan.GetCooldown() > 0) && !bBreak)
{
TraceLoggingActivity<g_hEtwProvider, DISKSPD_TRACE_INFO, TRACE_LEVEL_NONE> CoolActivity;
TraceLoggingWriteStart(CoolActivity, "Cool Down");
if (bSynchStop)
{
assert(NULL != pSynch->hStopEvent);
dwWaitStatus = WaitForSingleObject(pSynch->hStopEvent, 1000 * timeSpan.GetCooldown());
if (WAIT_OBJECT_0 != dwWaitStatus && WAIT_TIMEOUT != dwWaitStatus)
{
PrintError("Error during WaitForSingleObject\n");
// stopETW(bUseETW, hTraceSession);
_TerminateWorkerThreads(vhThreads);
return false;
}
}
else
{
Sleep(1000 * timeSpan.GetCooldown());
}
TraceLoggingWriteStop(CoolActivity, "Cool Down");
}
PrintVerbose(profile.GetVerbose(), "finished test...\n");
//
// signal the threads to finish
//
g_bRun = FALSE;
if (timeSpan.GetCompletionRoutines())
{
if (!SetEvent(hEndEvent))
{
PrintError("Error signaling end event\n");
// stopETW(bUseETW, hTraceSession);
return false;
}
}
//
// wait till all of the threads finish
//
#pragma warning( push )
#pragma warning( disable : 28112 )
while (g_lRunningThreadsCount > 0)
{
Sleep(10); //FUTURE EXTENSION: a timeout should be implemented
}
#pragma warning( pop )
//check if there has been an error during threads execution
if (g_bThreadError)
{
PrintError("There has been an error during threads execution\n");
return false;
}
//
// close events' handles
//
CloseHandle(hStartEvent);
hStartEvent = NULL;
if (NULL != hEndEvent)
{
CloseHandle(hEndEvent);
hEndEvent = NULL;
}
//FUTURE EXTENSION: hStartEvent and hEndEvent should be closed in case of error too
//
// compute time spent by each cpu
//
for (unsigned int p = 0; p < g_SystemInformation.processorTopology._ulProcCount; ++p)
{
assert(vPerfDone[p].IdleTime.QuadPart >= vPerfInit[p].IdleTime.QuadPart);
assert(vPerfDone[p].KernelTime.QuadPart >= vPerfInit[p].KernelTime.QuadPart);
assert(vPerfDone[p].UserTime.QuadPart >= vPerfInit[p].UserTime.QuadPart);
assert(vPerfDone[p].Reserved1[0].QuadPart >= vPerfInit[p].Reserved1[0].QuadPart);
assert(vPerfDone[p].Reserved1[1].QuadPart >= vPerfInit[p].Reserved1[1].QuadPart);
assert(vPerfDone[p].Reserved2 >= vPerfInit[p].Reserved2);
vPerfDiff[p].IdleTime.QuadPart = vPerfDone[p].IdleTime.QuadPart - vPerfInit[p].IdleTime.QuadPart;
vPerfDiff[p].KernelTime.QuadPart = vPerfDone[p].KernelTime.QuadPart - vPerfInit[p].KernelTime.QuadPart;
vPerfDiff[p].UserTime.QuadPart = vPerfDone[p].UserTime.QuadPart - vPerfInit[p].UserTime.QuadPart;
vPerfDiff[p].Reserved1[0].QuadPart = vPerfDone[p].Reserved1[0].QuadPart - vPerfInit[p].Reserved1[0].QuadPart;
vPerfDiff[p].Reserved1[1].QuadPart = vPerfDone[p].Reserved1[1].QuadPart - vPerfInit[p].Reserved1[1].QuadPart;
vPerfDiff[p].Reserved2 = vPerfDone[p].Reserved2 - vPerfInit[p].Reserved2;
}
//
// process results and pass them to the result parser
//
// get processors perf. info
results.vSystemProcessorPerfInfo = vPerfDiff;
results.ullTimeCount = ullTimeDiff;
//
// create structure containing etw results and properties
//
results.fUseETW = fUseETW;
if (fUseETW)
{
results.EtwEventCounters = g_EtwEventCounters;
results.EtwSessionInfo = _GetResultETWSession(pETWSession);
// TODO: refactor to a separate function
results.EtwMask.bProcess = profile.GetEtwProcess();
results.EtwMask.bThread = profile.GetEtwThread();
results.EtwMask.bImageLoad = profile.GetEtwImageLoad();
results.EtwMask.bDiskIO = profile.GetEtwDiskIO();
results.EtwMask.bMemoryPageFaults = profile.GetEtwMemoryPageFaults();
results.EtwMask.bMemoryHardFaults = profile.GetEtwMemoryHardFaults();
results.EtwMask.bNetwork = profile.GetEtwNetwork();
results.EtwMask.bRegistry = profile.GetEtwRegistry();
results.EtwMask.bUsePagedMemory = profile.GetEtwUsePagedMemory();
results.EtwMask.bUsePerfTimer = profile.GetEtwUsePerfTimer();
results.EtwMask.bUseSystemTimer = profile.GetEtwUseSystemTimer();
results.EtwMask.bUseCyclesCounter = profile.GetEtwUseCyclesCounter();
free(pETWSession);
}
// free memory used by random data write buffers
for (auto i = vTargets.begin(); i != vTargets.end(); i++)
{
i->FreeRandomDataWriteBuffer();
}
// TODO: this won't catch error cases, which exit early
InterlockedExchange(&g_lGeneratorRunning, 0);
return true;
}