DWORD PALAPI CPalSynchronizationManager::WorkerThread()

in CoreCLRProfiler/native/coreclr_headers/src/pal/src/synchmgr/synchmanager.cpp [1696:1987]


    DWORD PALAPI CPalSynchronizationManager::WorkerThread(LPVOID pArg)
    {
        PAL_ERROR palErr;
        bool fShuttingDown = false;
        bool fWorkerIsDone = false;
        int iPollTimeout = INFTIM;
        SynchWorkerCmd swcCmd;
        ThreadWakeupReason twrWakeUpReason;
        SharedID shridMarshaledData;
        DWORD dwData;
        CPalSynchronizationManager * pSynchManager =
            reinterpret_cast<CPalSynchronizationManager*>(pArg);
        CPalThread * pthrWorker = InternalGetCurrentThread();

        while (!fWorkerIsDone)
        {
            LONG lProcessCount;

            palErr = pSynchManager->ReadCmdFromProcessPipe(iPollTimeout,
                                                           &swcCmd,
                                                           &shridMarshaledData,
                                                           &dwData);
            if (NO_ERROR != palErr)
            {
                ERROR("Received error %x from ReadCmdFromProcessPipe()\n",
                      palErr);
                continue;
            }
            switch (swcCmd)
            {
                case SynchWorkerCmdTerminationRequest:
                    // This worker thread is being asked to initiate process termination

                    HANDLE hTerminationRequestHandlingThread;
                    palErr = InternalCreateThread(pthrWorker,
                                      NULL,
                                      0,
                                      &TerminationRequestHandlingRoutine,
                                      NULL,
                                      0,
                                      PalWorkerThread,
                                      NULL,
                                      &hTerminationRequestHandlingThread);

                    if (NO_ERROR != palErr)
                    {
                        ERROR("Unable to create worker thread\n");
                    }

                    if (hTerminationRequestHandlingThread != NULL)
                    {
                        CloseHandle(hTerminationRequestHandlingThread);
                    }

                    break;
                case SynchWorkerCmdNop:
                    TRACE("Synch Worker: received SynchWorkerCmdNop\n");
                    if (fShuttingDown)
                    {
                        TRACE("Synch Worker: received a timeout when "
                              "fShuttingDown==true: worker is done, bailing "
                              "out from the loop\n");

                        // Whether WorkerThreadShuttingDownTimeout has elapsed
                        // or the last process with a descriptor opened for
                        // write on our process pipe, has just closed it,
                        // causing an EOF on the read fd (that can happen only
                        // at shutdown time since during normal run time we
                        // hold a fd opened for write within this process).
                        // In both the case it is time to go for the worker
                        // thread.
                        fWorkerIsDone = true;
                    }
                    else
                    {
                        lProcessCount = pSynchManager->DoMonitorProcesses(pthrWorker);
                        if (lProcessCount > 0)
                        {
                            iPollTimeout = WorkerThreadProcMonitoringTimeout;
                        }
                        else
                        {
                            iPollTimeout = INFTIM;
                        }
                    }
                    break;
                case SynchWorkerCmdRemoteSignal:
                {
                    // Note: this cannot be a wait all
                    WaitingThreadsListNode * pWLNode;
                    ThreadWaitInfo * ptwiWaitInfo;
                    DWORD dwObjIndex;
                    bool fSharedSynchLock = false;

                    // Lock
                    AcquireLocalSynchLock(pthrWorker);
                    AcquireSharedSynchLock(pthrWorker);
                    fSharedSynchLock = true;

                    pWLNode = SharedIDToTypePointer(WaitingThreadsListNode,
                                                    shridMarshaledData);

                    _ASSERT_MSG(NULL != pWLNode, "Received bad Shared ID %p\n",
                                shridMarshaledData);
                    _ASSERT_MSG(gPID == pWLNode->dwProcessId,
                                "Remote signal apparently sent to the wrong "
                                "process [target pid=%u current pid=%u]\n",
                                pWLNode->dwProcessId, gPID);
                    _ASSERT_MSG(0 == (WTLN_FLAG_WAIT_ALL & pWLNode->dwFlags),
                                "Wait all with remote awakening delegated "
                                "through SynchWorkerCmdRemoteSignal rather than "
                                "SynchWorkerCmdDelegatedObjectSignaling\n");


                    // Get the object index
                    dwObjIndex = pWLNode->dwObjIndex;

                    // Get the WaitInfo
                    ptwiWaitInfo = pWLNode->ptwiWaitInfo;

                    // Initialize the WakeUpReason to WaitSucceeded
                    twrWakeUpReason = WaitSucceeded;

                    CSynchData * psdSynchData =
                        SharedIDToTypePointer(CSynchData,
                                              pWLNode->ptrOwnerObjSynchData.shrid);

                    TRACE("Synch Worker: received REMOTE SIGNAL cmd "
                        "[WInfo=%p {Type=%u Domain=%u ObjCount=%d TgtThread=%x} "
                        "SynchData={shriId=%p p=%p} {SigCount=%d IsAbandoned=%d}\n",
                        ptwiWaitInfo, ptwiWaitInfo->wtWaitType, ptwiWaitInfo->wdWaitDomain,
                        ptwiWaitInfo->lObjCount, ptwiWaitInfo->pthrOwner->GetThreadId(),
                        (VOID *)pWLNode->ptrOwnerObjSynchData.shrid, psdSynchData,
                        psdSynchData->GetSignalCount(), psdSynchData->IsAbandoned());

                    if (CObjectType::OwnershipTracked ==
                        psdSynchData->GetObjectType()->GetOwnershipSemantics())
                    {
                        // Abandoned status is not propagated through process
                        // pipe: need to get it from the object itself before
                        // resetting the data by acquiring the object ownership
                        if (psdSynchData->IsAbandoned())
                        {
                            twrWakeUpReason = MutexAbondoned;
                        }

                        // Acquire ownership
                        palErr = psdSynchData->AssignOwnershipToThread(
                                    pthrWorker,
                                    ptwiWaitInfo->pthrOwner);
                        if (NO_ERROR != palErr)
                        {
                            ERROR("Synch Worker: AssignOwnershipToThread "
                                  "failed with error %u; ownership data on "
                                  "object with SynchData %p may be "
                                  "corrupted\n", palErr, psdSynchData);
                        }
                    }

                    // Unregister the wait
                    pSynchManager->UnRegisterWait(pthrWorker,
                                                  ptwiWaitInfo,
                                                  fSharedSynchLock);

                    // pWLNode is no longer valid after UnRegisterWait
                    pWLNode = NULL;

                    TRACE("Synch Worker: Waking up local thread %x "
                          "{WakeUpReason=%u ObjIndex=%u}\n",
                          ptwiWaitInfo->pthrOwner->GetThreadId(),
                          twrWakeUpReason, dwObjIndex);

                    // Wake up the target thread
                    palErr = WakeUpLocalThread(
                        pthrWorker,
                        ptwiWaitInfo->pthrOwner,
                        twrWakeUpReason,
                        dwObjIndex);
                    if (NO_ERROR != palErr)
                    {
                        ERROR("Synch Worker: Failed to wake up local thread "
                              "%#x while propagating remote signaling: "
                              "object signaling may be lost\n",
                              ptwiWaitInfo->pthrOwner->GetThreadId());
                    }

                    // Unlock
                    ReleaseSharedSynchLock(pthrWorker);
                    fSharedSynchLock = false;
                    ReleaseLocalSynchLock(pthrWorker);

                    break;
                }
                case SynchWorkerCmdDelegatedObjectSignaling:
                {
                    CSynchData * psdSynchData;

                    TRACE("Synch Worker: received "
                          "SynchWorkerCmdDelegatedObjectSignaling\n");

                    psdSynchData = SharedIDToTypePointer(CSynchData,
                                                       shridMarshaledData);

                    _ASSERT_MSG(NULL != psdSynchData, "Received bad Shared ID %p\n",
                                shridMarshaledData);
                    _ASSERT_MSG(0 < dwData && (DWORD)INT_MAX > dwData,
                                "Received remote signaling with invalid signal "
                                "count\n");

                    // Lock
                    AcquireLocalSynchLock(pthrWorker);
                    AcquireSharedSynchLock(pthrWorker);

                    TRACE("Synch Worker: received DELEGATED OBJECT SIGNALING "
                        "cmd [SynchData={shriId=%p p=%p} SigCount=%u] [Current obj SigCount=%d "
                        "IsAbandoned=%d]\n", (VOID *)shridMarshaledData,
                        psdSynchData, dwData, psdSynchData->GetSignalCount(),
                        psdSynchData->IsAbandoned());

                    psdSynchData->Signal(pthrWorker,
                                       psdSynchData->GetSignalCount() + dwData,
                                       true);

                    // Current SynchData has been AddRef'd by remote process in
                    // order to be marshaled to the current one, therefore at
                    // this point we need to release it
                    psdSynchData->Release(pthrWorker);

                    // Unlock
                    ReleaseSharedSynchLock(pthrWorker);
                    ReleaseLocalSynchLock(pthrWorker);

                    break;
                }
                case SynchWorkerCmdShutdown:
                    TRACE("Synch Worker: received SynchWorkerCmdShutdown\n");

                    // Shutdown the process pipe: this will cause the process
                    // pipe to be unlinked and its write-only file descriptor
                    // to be closed, so that when the last fd opened for write
                    // on the fifo (from another process) will be closed, we
                    // will receive an EOF on the read end (i.e. poll in
                    // ReadBytesFromProcessPipe will return 1 with no data to
                    // be read). That will allow the worker thread to process
                    // possible commands already successfully written to the
                    // pipe by some other process, before shutting down.
                    pSynchManager->ShutdownProcessPipe();

                    // Shutting down: this will cause the worker thread to
                    // fetch residual cmds from the process pipe until an
                    // EOF is converted to a SynchWorkerCmdNop or the
                    // WorkerThreadShuttingDownTimeout has elapsed without
                    // receiving any cmd.
                    fShuttingDown = true;

                    // Set the timeout to WorkerThreadShuttingDownTimeout
                    iPollTimeout = WorkerThreadShuttingDownTimeout;
                    break;
                default:
                    ASSERT("Synch Worker: Unknown worker cmd [swcWorkerCmd=%d]\n",
                           swcCmd);
                    break;
            }
        }

        int iRet;
        ThreadNativeWaitData * ptnwdWorkerThreadNativeData =
            &pthrWorker->synchronizationInfo.m_tnwdNativeData;

        // Using the worker thread's predicate/condition/mutex
        // (that normally are never used) to signal the shutting
        // down thread that the worker thread is done
        iRet = pthread_mutex_lock(&ptnwdWorkerThreadNativeData->mutex);
        _ASSERT_MSG(0 == iRet, "Cannot lock mutex [err=%d]\n", iRet);

        ptnwdWorkerThreadNativeData->iPred = TRUE;

        iRet = pthread_cond_signal(&ptnwdWorkerThreadNativeData->cond);
        if (0 != iRet)
        {
            ERROR ("pthread_cond_signal returned %d [errno=%d (%s)]\n",
                   iRet, errno, strerror(errno));
        }

        iRet = pthread_mutex_unlock(&ptnwdWorkerThreadNativeData->mutex);
        _ASSERT_MSG(0 == iRet, "Cannot lock mutex [err=%d]\n", iRet);

        // Sleep forever
        ThreadPrepareForShutdown();

        return 0;
    }