in CoreCLRProfiler/native/coreclr_headers/src/pal/src/synchmgr/synchmanager.cpp [1696:1987]
DWORD PALAPI CPalSynchronizationManager::WorkerThread(LPVOID pArg)
{
PAL_ERROR palErr;
bool fShuttingDown = false;
bool fWorkerIsDone = false;
int iPollTimeout = INFTIM;
SynchWorkerCmd swcCmd;
ThreadWakeupReason twrWakeUpReason;
SharedID shridMarshaledData;
DWORD dwData;
CPalSynchronizationManager * pSynchManager =
reinterpret_cast<CPalSynchronizationManager*>(pArg);
CPalThread * pthrWorker = InternalGetCurrentThread();
while (!fWorkerIsDone)
{
LONG lProcessCount;
palErr = pSynchManager->ReadCmdFromProcessPipe(iPollTimeout,
&swcCmd,
&shridMarshaledData,
&dwData);
if (NO_ERROR != palErr)
{
ERROR("Received error %x from ReadCmdFromProcessPipe()\n",
palErr);
continue;
}
switch (swcCmd)
{
case SynchWorkerCmdTerminationRequest:
// This worker thread is being asked to initiate process termination
HANDLE hTerminationRequestHandlingThread;
palErr = InternalCreateThread(pthrWorker,
NULL,
0,
&TerminationRequestHandlingRoutine,
NULL,
0,
PalWorkerThread,
NULL,
&hTerminationRequestHandlingThread);
if (NO_ERROR != palErr)
{
ERROR("Unable to create worker thread\n");
}
if (hTerminationRequestHandlingThread != NULL)
{
CloseHandle(hTerminationRequestHandlingThread);
}
break;
case SynchWorkerCmdNop:
TRACE("Synch Worker: received SynchWorkerCmdNop\n");
if (fShuttingDown)
{
TRACE("Synch Worker: received a timeout when "
"fShuttingDown==true: worker is done, bailing "
"out from the loop\n");
// Whether WorkerThreadShuttingDownTimeout has elapsed
// or the last process with a descriptor opened for
// write on our process pipe, has just closed it,
// causing an EOF on the read fd (that can happen only
// at shutdown time since during normal run time we
// hold a fd opened for write within this process).
// In both the case it is time to go for the worker
// thread.
fWorkerIsDone = true;
}
else
{
lProcessCount = pSynchManager->DoMonitorProcesses(pthrWorker);
if (lProcessCount > 0)
{
iPollTimeout = WorkerThreadProcMonitoringTimeout;
}
else
{
iPollTimeout = INFTIM;
}
}
break;
case SynchWorkerCmdRemoteSignal:
{
// Note: this cannot be a wait all
WaitingThreadsListNode * pWLNode;
ThreadWaitInfo * ptwiWaitInfo;
DWORD dwObjIndex;
bool fSharedSynchLock = false;
// Lock
AcquireLocalSynchLock(pthrWorker);
AcquireSharedSynchLock(pthrWorker);
fSharedSynchLock = true;
pWLNode = SharedIDToTypePointer(WaitingThreadsListNode,
shridMarshaledData);
_ASSERT_MSG(NULL != pWLNode, "Received bad Shared ID %p\n",
shridMarshaledData);
_ASSERT_MSG(gPID == pWLNode->dwProcessId,
"Remote signal apparently sent to the wrong "
"process [target pid=%u current pid=%u]\n",
pWLNode->dwProcessId, gPID);
_ASSERT_MSG(0 == (WTLN_FLAG_WAIT_ALL & pWLNode->dwFlags),
"Wait all with remote awakening delegated "
"through SynchWorkerCmdRemoteSignal rather than "
"SynchWorkerCmdDelegatedObjectSignaling\n");
// Get the object index
dwObjIndex = pWLNode->dwObjIndex;
// Get the WaitInfo
ptwiWaitInfo = pWLNode->ptwiWaitInfo;
// Initialize the WakeUpReason to WaitSucceeded
twrWakeUpReason = WaitSucceeded;
CSynchData * psdSynchData =
SharedIDToTypePointer(CSynchData,
pWLNode->ptrOwnerObjSynchData.shrid);
TRACE("Synch Worker: received REMOTE SIGNAL cmd "
"[WInfo=%p {Type=%u Domain=%u ObjCount=%d TgtThread=%x} "
"SynchData={shriId=%p p=%p} {SigCount=%d IsAbandoned=%d}\n",
ptwiWaitInfo, ptwiWaitInfo->wtWaitType, ptwiWaitInfo->wdWaitDomain,
ptwiWaitInfo->lObjCount, ptwiWaitInfo->pthrOwner->GetThreadId(),
(VOID *)pWLNode->ptrOwnerObjSynchData.shrid, psdSynchData,
psdSynchData->GetSignalCount(), psdSynchData->IsAbandoned());
if (CObjectType::OwnershipTracked ==
psdSynchData->GetObjectType()->GetOwnershipSemantics())
{
// Abandoned status is not propagated through process
// pipe: need to get it from the object itself before
// resetting the data by acquiring the object ownership
if (psdSynchData->IsAbandoned())
{
twrWakeUpReason = MutexAbondoned;
}
// Acquire ownership
palErr = psdSynchData->AssignOwnershipToThread(
pthrWorker,
ptwiWaitInfo->pthrOwner);
if (NO_ERROR != palErr)
{
ERROR("Synch Worker: AssignOwnershipToThread "
"failed with error %u; ownership data on "
"object with SynchData %p may be "
"corrupted\n", palErr, psdSynchData);
}
}
// Unregister the wait
pSynchManager->UnRegisterWait(pthrWorker,
ptwiWaitInfo,
fSharedSynchLock);
// pWLNode is no longer valid after UnRegisterWait
pWLNode = NULL;
TRACE("Synch Worker: Waking up local thread %x "
"{WakeUpReason=%u ObjIndex=%u}\n",
ptwiWaitInfo->pthrOwner->GetThreadId(),
twrWakeUpReason, dwObjIndex);
// Wake up the target thread
palErr = WakeUpLocalThread(
pthrWorker,
ptwiWaitInfo->pthrOwner,
twrWakeUpReason,
dwObjIndex);
if (NO_ERROR != palErr)
{
ERROR("Synch Worker: Failed to wake up local thread "
"%#x while propagating remote signaling: "
"object signaling may be lost\n",
ptwiWaitInfo->pthrOwner->GetThreadId());
}
// Unlock
ReleaseSharedSynchLock(pthrWorker);
fSharedSynchLock = false;
ReleaseLocalSynchLock(pthrWorker);
break;
}
case SynchWorkerCmdDelegatedObjectSignaling:
{
CSynchData * psdSynchData;
TRACE("Synch Worker: received "
"SynchWorkerCmdDelegatedObjectSignaling\n");
psdSynchData = SharedIDToTypePointer(CSynchData,
shridMarshaledData);
_ASSERT_MSG(NULL != psdSynchData, "Received bad Shared ID %p\n",
shridMarshaledData);
_ASSERT_MSG(0 < dwData && (DWORD)INT_MAX > dwData,
"Received remote signaling with invalid signal "
"count\n");
// Lock
AcquireLocalSynchLock(pthrWorker);
AcquireSharedSynchLock(pthrWorker);
TRACE("Synch Worker: received DELEGATED OBJECT SIGNALING "
"cmd [SynchData={shriId=%p p=%p} SigCount=%u] [Current obj SigCount=%d "
"IsAbandoned=%d]\n", (VOID *)shridMarshaledData,
psdSynchData, dwData, psdSynchData->GetSignalCount(),
psdSynchData->IsAbandoned());
psdSynchData->Signal(pthrWorker,
psdSynchData->GetSignalCount() + dwData,
true);
// Current SynchData has been AddRef'd by remote process in
// order to be marshaled to the current one, therefore at
// this point we need to release it
psdSynchData->Release(pthrWorker);
// Unlock
ReleaseSharedSynchLock(pthrWorker);
ReleaseLocalSynchLock(pthrWorker);
break;
}
case SynchWorkerCmdShutdown:
TRACE("Synch Worker: received SynchWorkerCmdShutdown\n");
// Shutdown the process pipe: this will cause the process
// pipe to be unlinked and its write-only file descriptor
// to be closed, so that when the last fd opened for write
// on the fifo (from another process) will be closed, we
// will receive an EOF on the read end (i.e. poll in
// ReadBytesFromProcessPipe will return 1 with no data to
// be read). That will allow the worker thread to process
// possible commands already successfully written to the
// pipe by some other process, before shutting down.
pSynchManager->ShutdownProcessPipe();
// Shutting down: this will cause the worker thread to
// fetch residual cmds from the process pipe until an
// EOF is converted to a SynchWorkerCmdNop or the
// WorkerThreadShuttingDownTimeout has elapsed without
// receiving any cmd.
fShuttingDown = true;
// Set the timeout to WorkerThreadShuttingDownTimeout
iPollTimeout = WorkerThreadShuttingDownTimeout;
break;
default:
ASSERT("Synch Worker: Unknown worker cmd [swcWorkerCmd=%d]\n",
swcCmd);
break;
}
}
int iRet;
ThreadNativeWaitData * ptnwdWorkerThreadNativeData =
&pthrWorker->synchronizationInfo.m_tnwdNativeData;
// Using the worker thread's predicate/condition/mutex
// (that normally are never used) to signal the shutting
// down thread that the worker thread is done
iRet = pthread_mutex_lock(&ptnwdWorkerThreadNativeData->mutex);
_ASSERT_MSG(0 == iRet, "Cannot lock mutex [err=%d]\n", iRet);
ptnwdWorkerThreadNativeData->iPred = TRUE;
iRet = pthread_cond_signal(&ptnwdWorkerThreadNativeData->cond);
if (0 != iRet)
{
ERROR ("pthread_cond_signal returned %d [errno=%d (%s)]\n",
iRet, errno, strerror(errno));
}
iRet = pthread_mutex_unlock(&ptnwdWorkerThreadNativeData->mutex);
_ASSERT_MSG(0 == iRet, "Cannot lock mutex [err=%d]\n", iRet);
// Sleep forever
ThreadPrepareForShutdown();
return 0;
}