DWORD WINAPI threadFunc()

in IORequestGenerator/IORequestGenerator.cpp [1062:1666]


DWORD WINAPI threadFunc(LPVOID cookie)
{
    bool fOk = true;
    bool fAnyMappedIo = false;
    bool fAllMappedIo = true;
    ThreadParameters *p = reinterpret_cast<ThreadParameters *>(cookie);
    HANDLE hCompletionPort = nullptr;

    //
    // A single file can be specified in multiple targets, so only open one
    // handle for each unique file.
    //

    vector<HANDLE> vhUniqueHandles;
    map<UniqueTarget, UINT32> mHandleMap;

    bool fCalculateIopsStdDev = p->pTimeSpan->GetCalculateIopsStdDev();
    UINT64 ioBucketDuration = 0;
    UINT32 expectedNumberOfBuckets = 0;
    if(fCalculateIopsStdDev)
    {
        UINT32 ioBucketDurationInMilliseconds = p->pTimeSpan->GetIoBucketDurationInMilliseconds();
        ioBucketDuration = PerfTimer::MillisecondsToPerfTime(ioBucketDurationInMilliseconds);
        expectedNumberOfBuckets = Util::QuotientCeiling(p->pTimeSpan->GetDuration() * 1000, ioBucketDurationInMilliseconds);
    }

    // apply affinity. The specific assignment is provided in the thread profile up front.
    if (!p->pTimeSpan->GetDisableAffinity())
    {
        GROUP_AFFINITY GroupAffinity;

        PrintVerbose(p->pProfile->GetVerbose(), "affinitizing thread %u to Group %u / CPU %u\n", p->ulThreadNo, p->wGroupNum, p->bProcNum);
        SetProcGroupMask(p->wGroupNum, p->bProcNum, &GroupAffinity);

        HANDLE hThread = GetCurrentThread();
        if (SetThreadGroupAffinity(hThread, &GroupAffinity, nullptr) == FALSE)
        {
            PrintError("Error setting affinity mask in thread %u\n", p->ulThreadNo);
            fOk = false;
            goto cleanup;
        }
    }

    // adjust thread token if large pages are needed
    for (auto pTarget = p->vTargets.begin(); pTarget != p->vTargets.end(); pTarget++)
    {
        if (pTarget->GetUseLargePages())
        {
            if (!SetPrivilege(SE_LOCK_MEMORY_NAME))
            {
                fOk = false;
                goto cleanup;
            }
            break;
        }
    }

    UINT32 cIORequests = p->GetTotalRequestCount();

    size_t iTarget = 0;
    for (auto pTarget = p->vTargets.begin(); pTarget != p->vTargets.end(); pTarget++)
    {
        bool fPhysical = false;
        bool fPartition = false;

        string sPath(pTarget->GetPath());
        const char *filename = sPath.c_str();

        const char *fname = nullptr;    //filename (can point to physFN)
        char physFN[32];                //disk/partition name

        if (NULL == filename || NULL == *(filename))
        {
            PrintError("FATAL ERROR: invalid filename\n");
            fOk = false;
            goto cleanup;
        }

        //check if it is a physical drive
        if ('#' == *filename && NULL != *(filename + 1))
        {
            if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
            {
                PrintError("Memory mapped I/O is not supported on physical drives\n");
                fOk = false;
                goto cleanup;
            }
            UINT32 nDriveNo = (UINT32)atoi(filename + 1);
            fPhysical = true;
            sprintf_s(physFN, 32, "\\\\.\\PhysicalDrive%u", nDriveNo);
            fname = physFN;
        }

        //check if it is a partition
        if (!fPhysical && NULL != *(filename + 1) && NULL == *(filename + 2) && isalpha((unsigned char)filename[0]) && ':' == filename[1])
        {
            if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
            {
                PrintError("Memory mapped I/O is not supported on partitions\n");
                fOk = false;
                goto cleanup;
            }
            fPartition = true;

            sprintf_s(physFN, 32, "\\\\.\\%c:", filename[0]);
            fname = physFN;
        }

        //check if it is a regular file
        if (!fPhysical && !fPartition)
        {
            fname = sPath.c_str();
        }

        // get/set file flags
        DWORD dwFlags = pTarget->GetCreateFlags(cIORequests > 1);
        DWORD dwDesiredAccess = 0;
        if (pTarget->GetWriteRatio() == 0)
        {
            dwDesiredAccess = GENERIC_READ;
        }
        else if (pTarget->GetWriteRatio() == 100)
        {
            dwDesiredAccess = GENERIC_WRITE;
        }
        else
        {
            dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
        }

        if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
        {
            dwDesiredAccess = GENERIC_READ | GENERIC_WRITE;
            fAnyMappedIo = true;
        }
        else
        {
            fAllMappedIo = false;
        }

        HANDLE hFile;
        UniqueTarget ut;
        ut.path = sPath;
        ut.priority = pTarget->GetIOPriorityHint();
        ut.caching = pTarget->GetCacheMode();
        ut.dwDesiredAccess = dwDesiredAccess;
        ut.dwFlags = dwFlags;

        if (mHandleMap.find(ut) == mHandleMap.end()) {
            hFile = CreateFile(fname,
                dwDesiredAccess,
                FILE_SHARE_READ | FILE_SHARE_WRITE,
                nullptr,        //security
                OPEN_EXISTING,
                dwFlags,        //flags
                nullptr);       //template file
            if (INVALID_HANDLE_VALUE == hFile)
            {
                // TODO: error out
                PrintError("Error opening file: %s [%u]\n", sPath.c_str(), GetLastError());
                fOk = false;
                goto cleanup;
            }

            if (pTarget->GetCacheMode() == TargetCacheMode::DisableLocalCache)
            {
                DWORD Status = DisableLocalCache(hFile);
                if (Status != ERROR_SUCCESS)
                {
                    PrintError("Failed to disable local caching (error %u). NOTE: only supported on remote filesystems with Windows 8 or newer.\n", Status);
                    fOk = false;
                    goto cleanup;
                }
            }

            //set IO priority
            if (pTarget->GetIOPriorityHint() != IoPriorityHintNormal)
            {
                _declspec(align(8)) FILE_IO_PRIORITY_HINT_INFO hintInfo;
                hintInfo.PriorityHint = pTarget->GetIOPriorityHint();
                if (!SetFileInformationByHandle(hFile, FileIoPriorityHintInfo, &hintInfo, sizeof(hintInfo)))
                {
                    PrintError("Error setting IO priority for file: %s [%u]\n", sPath.c_str(), GetLastError());
                    fOk = false;
                    goto cleanup;
                }
            }

            mHandleMap[ut] = (UINT32)vhUniqueHandles.size();
            vhUniqueHandles.push_back(hFile);
        }
        else {
            hFile = vhUniqueHandles[mHandleMap[ut]];
        }

        p->vhTargets.push_back(hFile);

        // obtain file/disk/partition size
        {
            UINT64 fsize = 0;   //file size

            //check if it is a disk
            if (fPhysical)
            {
                fsize = GetPhysicalDriveSize(hFile);
            }
            // check if it is a partition
            else if (fPartition)
            {
                fsize = GetPartitionSize(hFile);
            }
            // it has to be a regular file
            else
            {
                ULARGE_INTEGER ulsize;

                ulsize.LowPart = GetFileSize(hFile, &ulsize.HighPart);
                if (INVALID_FILE_SIZE == ulsize.LowPart && GetLastError() != NO_ERROR)
                {
                    PrintError("Error getting file size\n");
                    fOk = false;
                    goto cleanup;
                }
                else
                {
                    fsize = ulsize.QuadPart;
                }
            }

            // check if file size is valid (if it's == 0, it won't be useful)
            if (0 == fsize)
            {
                // TODO: error out
                PrintError("ERROR: target size could not be determined\n");
                fOk = false;
                goto cleanup;
            }

            if (fsize < pTarget->GetMaxFileSize())
            {
                PrintError("WARNING: file size %I64u is less than MaxFileSize %I64u\n", fsize, pTarget->GetMaxFileSize());
            }

            //
            // Build target state.
            //

            p->vTargetStates.emplace_back(
                p,
                iTarget,
                fsize);

            //
            // Ensure this thread can start given stride/size of target.
            //

            if (!p->vTargetStates[iTarget].CanStart())
            {
                PrintError("The file is too small. File: '%s' relative thread %u: file size: %I64u, base offset: %I64u, thread stride: %I64u, block size: %u\n",
                    pTarget->GetPath().c_str(),
                    p->ulRelativeThreadNo,
                    fsize,
                    pTarget->GetBaseFileOffsetInBytes(),
                    pTarget->GetThreadStrideInBytes(),
                    pTarget->GetBlockSizeInBytes());
                fOk = false;
                goto cleanup;
            }

            PrintVerbose(p->pProfile->GetVerbose(), "thread %u starting: file '%s' relative thread %u",
                p->ulThreadNo,
                pTarget->GetPath().c_str(),
                p->ulRelativeThreadNo);

            if (pTarget->GetRandomRatio() > 0)
            {
                PrintVerbose(p->pProfile->GetVerbose(), ", %u% random pattern\n",
                    pTarget->GetRandomRatio());
            }
            else
            {
                PrintVerbose(p->pProfile->GetVerbose(), ", %ssequential file offset\n", pTarget->GetUseInterlockedSequential() ? "interlocked ":"");
            }
        }

        // allocate memory for a data buffer
        if (!p->AllocateAndFillBufferForTarget(*pTarget))
        {
            PrintError("ERROR: Could not allocate a buffer for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
            fOk = false;
            goto cleanup;
        }

        // initialize memory mapped views of files
        if (pTarget->GetMemoryMappedIoMode() == MemoryMappedIoMode::On)
        {
            NTSTATUS status;
            PVOID nvToken;

            pTarget->SetMappedViewFileHandle(hFile);
            if (!p->InitializeMappedViewForTarget(*pTarget, dwDesiredAccess))
            {
                PrintError("ERROR: Could not map view for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
                fOk = false;
                goto cleanup;
            }

            if (pTarget->GetWriteThroughMode() == WriteThroughMode::On && nullptr == g_pfnRtlCopyMemoryNonTemporal)
            {
                PrintError("ERROR: Windows runtime environment does not support the non-temporal memory copy API for target '%s'.\n", pTarget->GetPath().c_str());
                fOk = false;
                goto cleanup;
            }

            if ((pTarget->GetMemoryMappedIoFlushMode() == MemoryMappedIoFlushMode::NonVolatileMemory) || (pTarget->GetMemoryMappedIoFlushMode() == MemoryMappedIoFlushMode::NonVolatileMemoryNoDrain))
            {
                // RtlGetNonVolatileToken() works only on DAX enabled PMEM devices.
                if (g_pfnRtlGetNonVolatileToken != nullptr && g_pfnRtlFreeNonVolatileToken != nullptr)
                {
                    status = g_pfnRtlGetNonVolatileToken(pTarget->GetMappedView(), (SIZE_T) pTarget->GetFileSize(), &nvToken);
                    if (!NT_SUCCESS(status))
                    {
                        PrintError("ERROR: Could not get non-volatile token for target '%s'. Error code: 0x%x\n", pTarget->GetPath().c_str(), GetLastError());
                        fOk = false;
                        goto cleanup;
                    }
                    pTarget->SetMemoryMappedIoNvToken(nvToken);
                }
                else
                {
                    PrintError("ERROR: Windows runtime environment does not support the non-volatile memory flushing APIs for target '%s'.\n", pTarget->GetPath().c_str());
                    fOk = false;
                    goto cleanup;
                }
            }
        }

        iTarget++;
    }

    // TODO: copy parameters for better memory locality?
    // TODO: tell the main thread we're ready

    PrintVerbose(p->pProfile->GetVerbose(), "thread %u started (random seed: %u)\n", p->ulThreadNo, p->ulRandSeed);

    p->pResults->vTargetResults.clear();
    p->pResults->vTargetResults.resize(p->vTargets.size());

    for (size_t i = 0; i < p->vTargets.size(); i++)
    {
        p->pResults->vTargetResults[i].sPath = p->vTargets[i].GetPath();
        p->pResults->vTargetResults[i].ullFileSize = p->vTargetStates[i].TargetSize();

        if(fCalculateIopsStdDev)
        {
            p->pResults->vTargetResults[i].readBucketizer.Initialize(ioBucketDuration, expectedNumberOfBuckets);
            p->pResults->vTargetResults[i].writeBucketizer.Initialize(ioBucketDuration, expectedNumberOfBuckets);
        }

        //
        // Copy effective distribution range to results for reporting (may be empty)
        //

        p->pResults->vTargetResults[i].vDistributionRange = p->vTargetStates[i]._vDistributionRange;
    }

    //
    // fill the IORequest structures
    //

    p->vIORequest.clear();

    if (p->pTimeSpan->GetThreadCount() != 0 &&
        p->pTimeSpan->GetRequestCount() != 0)
    {
        p->vIORequest.resize(cIORequests, IORequest(p->pRand));

        for (UINT32 iIORequest = 0; iIORequest < cIORequests; iIORequest++)
        {
            p->vIORequest[iIORequest].SetRequestIndex(iIORequest);

            for (unsigned int iFile = 0; iFile < p->vTargets.size(); iFile++)
            {
                Target *pTarget = &p->vTargets[iFile];
                const vector<ThreadTarget> vThreadTargets = pTarget->GetThreadTargets();
                UINT32 ulWeight = pTarget->GetWeight();

                for (UINT32 iThreadTarget = 0; iThreadTarget < vThreadTargets.size(); iThreadTarget++)
                {
                    if (vThreadTargets[iThreadTarget].GetThread() == p->ulRelativeThreadNo)
                    {
                        if (vThreadTargets[iThreadTarget].GetWeight() != 0)
                        {
                            ulWeight = vThreadTargets[iThreadTarget].GetWeight();
                        }
                        break;
                    }
                }

                //
                // Parallel async is not supported with -O for exactly this reason,
                // and is validated in the profile before reaching here. Document this
                // with the assert in comparison to the code in the non-O case below.
                // Parallel depends on the IORequest being for a single file only (the
                // seq offset is in the IORequest itself).
                //

                assert(pTarget->GetUseParallelAsyncIO() == false);

                p->vIORequest[iIORequest].AddTarget(pTarget, ulWeight);
            }
        }
    }
    else
    {
        for (unsigned int iFile = 0; iFile < p->vTargets.size(); iFile++)
        {
            Target *pTarget = &p->vTargets[iFile];

            for (DWORD iRequest = 0; iRequest < pTarget->GetRequestCount(); ++iRequest)
            {
                IORequest ioRequest(p->pRand);
                ioRequest.AddTarget(pTarget, 1);
                ioRequest.SetRequestIndex(iRequest);
                if (pTarget->GetUseParallelAsyncIO())
                {
                    p->vTargetStates[iFile].InitializeParallelAsyncIORequest(ioRequest);
                }

                p->vIORequest.push_back(ioRequest);
            }
        }
    }

    //
    // fill the throughput meter structures
    //
    size_t cTargets = p->vTargets.size();
    bool fUseThrougputMeter = false;
    for (size_t i = 0; i < cTargets; i++)
    {
        ThroughputMeter throughputMeter;
        Target *pTarget = &p->vTargets[i];
        DWORD dwBurstSize = pTarget->GetBurstSize();
        if (p->pTimeSpan->GetThreadCount() > 0)
        {
            if (pTarget->GetThreadTargets().size() == 0)
            {
                dwBurstSize /= p->pTimeSpan->GetThreadCount();
            }
            else
            {
                dwBurstSize /= (DWORD)pTarget->GetThreadTargets().size();
            }
        }
        else
        {
            dwBurstSize /= pTarget->GetThreadsPerFile();
        }

        if (pTarget->GetThroughputInBytesPerMillisecond() > 0 || pTarget->GetThinkTime() > 0)
        {
            fUseThrougputMeter = true;
            throughputMeter.Start(pTarget->GetThroughputInBytesPerMillisecond(), pTarget->GetBlockSizeInBytes(), pTarget->GetThinkTime(), dwBurstSize);
        }

        p->vThroughputMeters.push_back(throughputMeter);
    }

    if (!fUseThrougputMeter)
    {
        p->vThroughputMeters.clear();
    }

    //FUTURE EXTENSION: enable asynchronous I/O even if only 1 outstanding I/O per file (requires another parameter)
    if (cIORequests == 1 || fAllMappedIo)
    {
        //synchronous IO - no setup needed
    }
    else if (p->pTimeSpan->GetCompletionRoutines() && !fAnyMappedIo)
    {
        //in case of completion routines hEvent field is not used,
        //so we can use it to pass a pointer to the thread parameters
        for (UINT32 iIORequest = 0; iIORequest < cIORequests; iIORequest++) {
            OVERLAPPED *pOverlapped;

            pOverlapped = p->vIORequest[iIORequest].GetOverlapped();
            pOverlapped->hEvent = (HANDLE)p;
        }
    }
    else
    {
        //
        // create IO completion port if not doing completion routines or synchronous IO
        //
        for (unsigned int i = 0; i < vhUniqueHandles.size(); i++)
        {
            hCompletionPort = CreateIoCompletionPort(vhUniqueHandles[i], hCompletionPort, 0, 1);
            if (nullptr == hCompletionPort)
            {
                PrintError("unable to create IO completion port (error code: %u)\n", GetLastError());
                fOk = false;
                goto cleanup;
            }
        }
    }

    //
    // wait for a signal to start
    //
    PrintVerbose(p->pProfile->GetVerbose(), "thread %u: waiting for a signal to start\n", p->ulThreadNo);
    if( WAIT_FAILED == WaitForSingleObject(p->hStartEvent, INFINITE) )
    {
        PrintError("Waiting for a signal to start failed (error code: %u)\n", GetLastError());
        fOk = false;
        goto cleanup;
    }
    PrintVerbose(p->pProfile->GetVerbose(), "thread %u: received signal to start\n", p->ulThreadNo);

    //check if everything is ok
    if (g_bError)
    {
        fOk = false;
        goto cleanup;
    }

    //error handling and memory freeing is done in doWorkUsingIOCompletionPorts and doWorkUsingCompletionRoutines
    if (cIORequests == 1 || fAllMappedIo)
    {
        // use synchronous IO (it will also clse the event)
        if (!doWorkUsingSynchronousIO(p))
        {
            fOk = false;
            goto cleanup;
        }
    }
    else if (!p->pTimeSpan->GetCompletionRoutines() || fAnyMappedIo)
    {
        // use IO Completion Ports (it will also close the I/O completion port)
        if (!doWorkUsingIOCompletionPorts(p, hCompletionPort))
        {
            fOk = false;
            goto cleanup;
        }
    }
    else
    {
        //use completion routines
        if (!doWorkUsingCompletionRoutines(p))
        {
            fOk = false;
            goto cleanup;
        }
    }

    assert(!g_bError);  // at this point we shouldn't be seeing initialization error

    // save results

cleanup:
    if (!fOk)
    {
        g_bThreadError = TRUE;
    }

    // free memory allocated with VirtualAlloc
    for (auto i = p->vpDataBuffers.begin(); i != p->vpDataBuffers.end(); i++)
    {
        if (nullptr != *i)
        {
#pragma prefast(suppress:6001, "Prefast does not understand this vector will only contain validly allocated buffer pointers")
            VirtualFree(*i, 0, MEM_RELEASE);
        }
    }

    // free NV tokens
    for (auto i = p->vTargets.begin(); i != p->vTargets.end(); i++)
    {
        if (i->GetMemoryMappedIoNvToken() != nullptr && g_pfnRtlFreeNonVolatileToken != nullptr)
        {
            g_pfnRtlFreeNonVolatileToken(i->GetMemoryMappedIoNvToken());
            i->SetMemoryMappedIoNvToken(nullptr);
        }
    }

    // close files
    for (auto i = vhUniqueHandles.begin(); i != vhUniqueHandles.end(); i++)
    {
        CloseHandle(*i);
    }

    // close completion ports
    if (hCompletionPort != nullptr)
    {
        CloseHandle(hCompletionPort);
    }

    delete p->pRand;
    delete p;

    // notify master thread that we've finished
    InterlockedDecrement(&g_lRunningThreadsCount);

    return fOk ? 1 : 0;
}