in IORequestGenerator/IORequestGenerator.cpp [1062:1666]
DWORD WINAPI threadFunc(LPVOID cookie)
{
bool fOk = true;
bool fAnyMappedIo = false;
bool fAllMappedIo = true;
ThreadParameters *p = reinterpret_cast<ThreadParameters *>(cookie);
HANDLE hCompletionPort = nullptr;
//
// A single file can be specified in multiple targets, so only open one
// handle for each unique file.
//
vector<HANDLE> vhUniqueHandles;
map<UniqueTarget, UINT32> mHandleMap;
bool fCalculateIopsStdDev = p->pTimeSpan->GetCalculateIopsStdDev();
UINT64 ioBucketDuration = 0;
UINT32 expectedNumberOfBuckets = 0;
if(fCalculateIopsStdDev)
{
UINT32 ioBucketDurationInMilliseconds = p->pTimeSpan->GetIoBucketDurationInMilliseconds();
ioBucketDuration = PerfTimer::MillisecondsToPerfTime(ioBucketDurationInMilliseconds);
expectedNumberOfBuckets = Util::QuotientCeiling(p->pTimeSpan->GetDuration() * 1000, ioBucketDurationInMilliseconds);
}
// apply affinity. The specific assignment is provided in the thread profile up front.
if (!p->pTimeSpan->GetDisableAffinity())
{
GROUP_AFFINITY GroupAffinity;
PrintVerbose(p->pProfile->GetVerbose(), "affinitizing thread %u to Group %u / CPU %u\n", p->ulThreadNo, p->wGroupNum, p->bProcNum);
SetProcGroupMask(p->wGroupNum, p->bProcNum, &GroupAffinity);
HANDLE hThread = GetCurrentThread();
if (SetThreadGroupAffinity(hThread, &GroupAffinity, nullptr) == FALSE)
{
PrintError("Error setting affinity mask in thread %u\n", p->ulThreadNo);
fOk = false;
goto cleanup;
}
}
// adjust thread token if large pages are needed
for (auto pTarget = p->vTargets.begin(); pTarget != p->vTargets.end(); pTarget++)
{
if (pTarget->GetUseLargePages())
{
if (!SetPrivilege(SE_LOCK_MEMORY_NAME))
{
fOk = false;
goto cleanup;
}
break;
}
}
UINT32 cIORequests = p->GetTotalRequestCount();
size_t iTarget = 0;
for (auto pTarget = p->vTargets.begin(); pTarget != p->vTargets.end(); pTarget++)
{
bool fPhysical = false;
bool fPartition = false;
string sPath(pTarget->GetPath());
const char *filename = sPath.c_str();
const char *fname = nullptr; //filename (can point to physFN)
char physFN[32]; //disk/partition name
if (NULL == filename || NULL == *(filename))
{
PrintError("FATAL ERROR: invalid filename\n");
fOk = false;
goto cleanup;
}
//check if it is a physical drive
if ('#' == *filename && NULL != *(filename + 1))
{
if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
{
PrintError("Memory mapped I/O is not supported on physical drives\n");
fOk = false;
goto cleanup;
}
UINT32 nDriveNo = (UINT32)atoi(filename + 1);
fPhysical = true;
sprintf_s(physFN, 32, "\\\\.\\PhysicalDrive%u", nDriveNo);
fname = physFN;
}
//check if it is a partition
if (!fPhysical && NULL != *(filename + 1) && NULL == *(filename + 2) && isalpha((unsigned char)filename[0]) && ':' == filename[1])
{
if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
{
PrintError("Memory mapped I/O is not supported on partitions\n");
fOk = false;
goto cleanup;
}
fPartition = true;
sprintf_s(physFN, 32, "\\\\.\\%c:", filename[0]);
fname = physFN;
}
//check if it is a regular file
if (!fPhysical && !fPartition)
{
fname = sPath.c_str();
}
// get/set file flags
DWORD dwFlags = pTarget->GetCreateFlags(cIORequests > 1);
DWORD dwDesiredAccess = 0;
if (pTarget->GetWriteRatio() == 0)
{
dwDesiredAccess = GENERIC_READ;
}
else if (pTarget->GetWriteRatio() == 100)
{
dwDesiredAccess = GENERIC_WRITE;
}
else
{
dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
}
if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
{
dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
fAnyMappedIo = true;
}
else
{
fAllMappedIo = false;
}
HANDLE hFile;
UniqueTarget ut;
ut.path = sPath;
ut.priority = pTarget->GetIOPriorityHint();
ut.caching = pTarget->GetCacheMode();
ut.dwDesiredAccess = dwDesiredAccess;
ut.dwFlags = dwFlags;
if (mHandleMap.find(ut) == mHandleMap.end()) {
hFile = CreateFile(fname,
dwDesiredAccess,
FILE_SHARE_READ | FILE_SHARE_WRITE,
nullptr, //security
OPEN_EXISTING,
dwFlags, //flags
nullptr); //template file
if (INVALID_HANDLE_VALUE == hFile)
{
// TODO: error out
PrintError("Error opening file: %s [%u]\n", sPath.c_str(), GetLastError());
fOk = false;
goto cleanup;
}
if (pTarget->GetCacheMode() == TargetCacheMode::DisableLocalCache)
{
DWORD Status = DisableLocalCache(hFile);
if (Status != ERROR_SUCCESS)
{
PrintError("Failed to disable local caching (error %u). NOTE: only supported on remote filesystems with Windows 8 or newer.\n", Status);
fOk = false;
goto cleanup;
}
}
//set IO priority
if (pTarget->GetIOPriorityHint() != IoPriorityHintNormal)
{
_declspec(align(8)) FILE_IO_PRIORITY_HINT_INFO hintInfo;
hintInfo.PriorityHint = pTarget->GetIOPriorityHint();
if (!SetFileInformationByHandle(hFile, FileIoPriorityHintInfo, &hintInfo, sizeof(hintInfo)))
{
PrintError("Error setting IO priority for file: %s [%u]\n", sPath.c_str(), GetLastError());
fOk = false;
goto cleanup;
}
}
mHandleMap[ut] = (UINT32)vhUniqueHandles.size();
vhUniqueHandles.push_back(hFile);
}
else {
hFile = vhUniqueHandles[mHandleMap[ut]];
}
p->vhTargets.push_back(hFile);
// obtain file/disk/partition size
{
UINT64 fsize = 0; //file size
//check if it is a disk
if (fPhysical)
{
fsize = GetPhysicalDriveSize(hFile);
}
// check if it is a partition
else if (fPartition)
{
fsize = GetPartitionSize(hFile);
}
// it has to be a regular file
else
{
ULARGE_INTEGER ulsize;
ulsize.LowPart = GetFileSize(hFile, &ulsize.HighPart);
if (INVALID_FILE_SIZE == ulsize.LowPart && GetLastError() != NO_ERROR)
{
PrintError("Error getting file size\n");
fOk = false;
goto cleanup;
}
else
{
fsize = ulsize.QuadPart;
}
}
// check if file size is valid (if it's == 0, it won't be useful)
if (0 == fsize)
{
// TODO: error out
PrintError("ERROR: target size could not be determined\n");
fOk = false;
goto cleanup;
}
if (fsize < pTarget->GetMaxFileSize())
{
PrintError("WARNING: file size %I64u is less than MaxFileSize %I64u\n", fsize, pTarget->GetMaxFileSize());
}
//
// Build target state.
//
p->vTargetStates.emplace_back(
p,
iTarget,
fsize);
//
// Ensure this thread can start given stride/size of target.
//
if (!p->vTargetStates[iTarget].CanStart())
{
PrintError("The file is too small. File: '%s' relative thread %u: file size: %I64u, base offset: %I64u, thread stride: %I64u, block size: %u\n",
pTarget->GetPath().c_str(),
p->ulRelativeThreadNo,
fsize,
pTarget->GetBaseFileOffsetInBytes(),
pTarget->GetThreadStrideInBytes(),
pTarget->GetBlockSizeInBytes());
fOk = false;
goto cleanup;
}
PrintVerbose(p->pProfile->GetVerbose(), "thread %u starting: file '%s' relative thread %u",
p->ulThreadNo,
pTarget->GetPath().c_str(),
p->ulRelativeThreadNo);
if (pTarget->GetRandomRatio() > 0)
{
PrintVerbose(p->pProfile->GetVerbose(), ", %u% random pattern\n",
pTarget->GetRandomRatio());
}
else
{
PrintVerbose(p->pProfile->GetVerbose(), ", %ssequential file offset\n", pTarget->GetUseInterlockedSequential() ? "interlocked ":"");
}
}
// allocate memory for a data buffer
if (!p->AllocateAndFillBufferForTarget(*pTarget))
{
PrintError("ERROR: Could not allocate a buffer for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
fOk = false;
goto cleanup;
}
// initialize memory mapped views of files
if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
{
NTSTATUS status;
PVOID nvToken;
pTarget->SetMappedViewFileHandle(hFile);
if (!p->InitializeMappedViewForTarget(*pTarget, dwDesiredAccess))
{
PrintError("ERROR: Could not map view for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
fOk = false;
goto cleanup;
}
if (pTarget->GetWriteThroughMode() == WriteThroughMode::On && nullptr == g_pfnRtlCopyMemoryNonTemporal)
{
PrintError("ERROR: Windows runtime environment does not support the non-temporal memory copy API for target '%s'.\n", pTarget->GetPath().c_str());
fOk = false;
goto cleanup;
}
if ((pTarget->GetMemoryMappedIoFlushMode() == MemoryMappedIoFlushMode::NonVolatileMemory) || (pTarget->GetMemoryMappedIoFlushMode() == MemoryMappedIoFlushMode::NonVolatileMemoryNoDrain))
{
// RtlGetNonVolatileToken() works only on DAX enabled PMEM devices.
if (g_pfnRtlGetNonVolatileToken != nullptr && g_pfnRtlFreeNonVolatileToken != nullptr)
{
status = g_pfnRtlGetNonVolatileToken(pTarget->GetMappedView(), (SIZE_T) pTarget->GetFileSize(), &nvToken);
if (!NT_SUCCESS(status))
{
PrintError("ERROR: Could not get non-volatile token for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
fOk = false;
goto cleanup;
}
pTarget->SetMemoryMappedIoNvToken(nvToken);
}
else
{
PrintError("ERROR: Windows runtime environment does not support the non-volatile memory flushing APIs for target '%s'.\n", pTarget->GetPath().c_str());
fOk = false;
goto cleanup;
}
}
}
iTarget++;
}
// TODO: copy parameters for better memory locality?
// TODO: tell the main thread we're ready
PrintVerbose(p->pProfile->GetVerbose(), "thread %u started (random seed: %u)\n", p->ulThreadNo, p->ulRandSeed);
p->pResults->vTargetResults.clear();
p->pResults->vTargetResults.resize(p->vTargets.size());
for (size_t i = 0; i < p->vTargets.size(); i++)
{
p->pResults->vTargetResults[i].sPath = p->vTargets[i].GetPath();
p->pResults->vTargetResults[i].ullFileSize = p->vTargetStates[i].TargetSize();
if(fCalculateIopsStdDev)
{
p->pResults->vTargetResults[i].readBucketizer.Initialize(ioBucketDuration, expectedNumberOfBuckets);
p->pResults->vTargetResults[i].writeBucketizer.Initialize(ioBucketDuration, expectedNumberOfBuckets);
}
//
// Copy effective distribution range to results for reporting (may be empty)
//
p->pResults->vTargetResults[i].vDistributionRange = p->vTargetStates[i]._vDistributionRange;
}
//
// fill the IORequest structures
//
p->vIORequest.clear();
if (p->pTimeSpan->GetThreadCount() != 0 &&
p->pTimeSpan->GetRequestCount() != 0)
{
p->vIORequest.resize(cIORequests, IORequest(p->pRand));
for (UINT32 iIORequest = 0; iIORequest < cIORequests; iIORequest++)
{
p->vIORequest[iIORequest].SetRequestIndex(iIORequest);
for (unsigned int iFile = 0; iFile < p->vTargets.size(); iFile++)
{
Target *pTarget = &p->vTargets[iFile];
const vector<ThreadTarget> vThreadTargets = pTarget->GetThreadTargets();
UINT32 ulWeight = pTarget->GetWeight();
for (UINT32 iThreadTarget = 0; iThreadTarget < vThreadTargets.size(); iThreadTarget++)
{
if (vThreadTargets[iThreadTarget].GetThread() == p->ulRelativeThreadNo)
{
if (vThreadTargets[iThreadTarget].GetWeight() != 0)
{
ulWeight = vThreadTargets[iThreadTarget].GetWeight();
}
break;
}
}
//
// Parallel async is not supported with -O for exactly this reason,
// and is validated in the profile before reaching here. Document this
// with the assert in comparison to the code in the non-O case below.
// Parallel depends on the IORequest being for a single file only (the
// seq offset is in the IORequest itself).
//
assert(pTarget->GetUseParallelAsyncIO() == false);
p->vIORequest[iIORequest].AddTarget(pTarget, ulWeight);
}
}
}
else
{
for (unsigned int iFile = 0; iFile < p->vTargets.size(); iFile++)
{
Target *pTarget = &p->vTargets[iFile];
for (DWORD iRequest = 0; iRequest < pTarget->GetRequestCount(); ++iRequest)
{
IORequest ioRequest(p->pRand);
ioRequest.AddTarget(pTarget, 1);
ioRequest.SetRequestIndex(iRequest);
if (pTarget->GetUseParallelAsyncIO())
{
p->vTargetStates[iFile].InitializeParallelAsyncIORequest(ioRequest);
}
p->vIORequest.push_back(ioRequest);
}
}
}
//
// fill the throughput meter structures
//
size_t cTargets = p->vTargets.size();
bool fUseThrougputMeter = false;
for (size_t i = 0; i < cTargets; i++)
{
ThroughputMeter throughputMeter;
Target *pTarget = &p->vTargets[i];
DWORD dwBurstSize = pTarget->GetBurstSize();
if (p->pTimeSpan->GetThreadCount() > 0)
{
if (pTarget->GetThreadTargets().size() == 0)
{
dwBurstSize /= p->pTimeSpan->GetThreadCount();
}
else
{
dwBurstSize /= (DWORD)pTarget->GetThreadTargets().size();
}
}
else
{
dwBurstSize /= pTarget->GetThreadsPerFile();
}
if (pTarget->GetThroughputInBytesPerMillisecond() > 0 || pTarget->GetThinkTime() > 0)
{
fUseThrougputMeter = true;
throughputMeter.Start(pTarget->GetThroughputInBytesPerMillisecond(), pTarget->GetBlockSizeInBytes(), pTarget->GetThinkTime(), dwBurstSize);
}
p->vThroughputMeters.push_back(throughputMeter);
}
if (!fUseThrougputMeter)
{
p->vThroughputMeters.clear();
}
//FUTURE EXTENSION: enable asynchronous I/O even if only 1 outstanding I/O per file (requires another parameter)
if (cIORequests == 1 || fAllMappedIo)
{
//synchronous IO - no setup needed
}
else if (p->pTimeSpan->GetCompletionRoutines() && !fAnyMappedIo)
{
//in case of completion routines hEvent field is not used,
//so we can use it to pass a pointer to the thread parameters
for (UINT32 iIORequest = 0; iIORequest < cIORequests; iIORequest++) {
OVERLAPPED *pOverlapped;
pOverlapped = p->vIORequest[iIORequest].GetOverlapped();
pOverlapped->hEvent = (HANDLE)p;
}
}
else
{
//
// create IO completion port if not doing completion routines or synchronous IO
//
for (unsigned int i = 0; i < vhUniqueHandles.size(); i++)
{
hCompletionPort = CreateIoCompletionPort(vhUniqueHandles[i], hCompletionPort, 0, 1);
if (nullptr == hCompletionPort)
{
PrintError("unable to create IO completion port (error code: %u)\n", GetLastError());
fOk = false;
goto cleanup;
}
}
}
//
// wait for a signal to start
//
PrintVerbose(p->pProfile->GetVerbose(), "thread %u: waiting for a signal to start\n", p->ulThreadNo);
if( WAIT_FAILED == WaitForSingleObject(p->hStartEvent, INFINITE) )
{
PrintError("Waiting for a signal to start failed (error code: %u)\n", GetLastError());
fOk = false;
goto cleanup;
}
PrintVerbose(p->pProfile->GetVerbose(), "thread %u: received signal to start\n", p->ulThreadNo);
//check if everything is ok
if (g_bError)
{
fOk = false;
goto cleanup;
}
//error handling and memory freeing is done in doWorkUsingIOCompletionPorts and doWorkUsingCompletionRoutines
if (cIORequests == 1 || fAllMappedIo)
{
// use synchronous IO (it will also clse the event)
if (!doWorkUsingSynchronousIO(p))
{
fOk = false;
goto cleanup;
}
}
else if (!p->pTimeSpan->GetCompletionRoutines() || fAnyMappedIo)
{
// use IO Completion Ports (it will also close the I/O completion port)
if (!doWorkUsingIOCompletionPorts(p, hCompletionPort))
{
fOk = false;
goto cleanup;
}
}
else
{
//use completion routines
if (!doWorkUsingCompletionRoutines(p))
{
fOk = false;
goto cleanup;
}
}
assert(!g_bError); // at this point we shouldn't be seeing initialization error
// save results
cleanup:
if (!fOk)
{
g_bThreadError = TRUE;
}
// free memory allocated with VirtualAlloc
for (auto i = p->vpDataBuffers.begin(); i != p->vpDataBuffers.end(); i++)
{
if (nullptr != *i)
{
#pragma prefast(suppress:6001, "Prefast does not understand this vector will only contain validly allocated buffer pointers")
VirtualFree(*i, 0, MEM_RELEASE);
}
}
// free NV tokens
for (auto i = p->vTargets.begin(); i != p->vTargets.end(); i++)
{
if (i->GetMemoryMappedIoNvToken() != nullptr && g_pfnRtlFreeNonVolatileToken != nullptr)
{
g_pfnRtlFreeNonVolatileToken(i->GetMemoryMappedIoNvToken());
i->SetMemoryMappedIoNvToken(nullptr);
}
}
// close files
for (auto i = vhUniqueHandles.begin(); i != vhUniqueHandles.end(); i++)
{
CloseHandle(*i);
}
// close completion ports
if (hCompletionPort != nullptr)
{
CloseHandle(hCompletionPort);
}
delete p->pRand;
delete p;
// notify master thread that we've finished
InterlockedDecrement(&g_lRunningThreadsCount);
return fOk ? 1 : 0;
}