in rts/Schedule.c [130:553]
static bool requestSync (Capability **pcap, Task *task,
PendingSync *sync_type, SyncType *prev_sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
uint32_t to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
uint32_t prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);
static void deleteThread (StgTSO *tso);
static void deleteAllThreads (void);
#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
static void deleteThread_(StgTSO *tso);
#endif
/* ---------------------------------------------------------------------------
Main scheduling loop.
We use round-robin scheduling, each thread returning to the
scheduler loop when one of these conditions is detected:
* out of heap space
* timer expires (thread yields)
* thread blocks
* thread ends
* stack overflow
------------------------------------------------------------------------ */
static Capability *
schedule (Capability *initialCapability, Task *task)
{
StgTSO *t;
Capability *cap;
StgThreadReturnCode ret;
uint32_t prev_what_next;
bool ready_to_gc;
cap = initialCapability;
// Pre-condition: this task owns initialCapability.
// The sched_mutex is *NOT* held
// NB. on return, we still hold a capability.
debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
schedulePreLoop();
// -----------------------------------------------------------
// Scheduler loop starts here:
while (1) {
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
if (cap->in_haskell) {
errorBelch("schedule: re-entered unsafely.\n"
" Perhaps a 'foreign import unsafe' should be 'safe'?");
stg_exit(EXIT_FAILURE);
}
// Note [shutdown]: The interruption / shutdown sequence.
//
// In order to cleanly shut down the runtime, we want to:
// * make sure that all main threads return to their callers
// with the state 'Interrupted'.
// * clean up all OS threads assocated with the runtime
// * free all memory etc.
//
// So the sequence goes like this:
//
// * The shutdown sequence is initiated by calling hs_exit(),
// interruptStgRts(), or running out of memory in the GC.
//
// * Set sched_state = SCHED_INTERRUPTING
//
// * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
// scheduleDoGC(), which halts the whole runtime by acquiring all the
// capabilities, does a GC and then calls deleteAllThreads() to kill all
// the remaining threads. The zombies are left on the run queue for
// cleaning up. We can't kill threads involved in foreign calls.
//
// * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
//
// * After this point, there can be NO MORE HASKELL EXECUTION. This is
// enforced by the scheduler, which won't run any Haskell code when
// sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
// other capabilities by doing the GC earlier.
//
// * all workers exit when the run queue on their capability
// drains. All main threads will also exit when their TSO
// reaches the head of the run queue and they can return.
//
// * eventually all Capabilities will shut down, and the RTS can
// exit.
//
// * We might be left with threads blocked in foreign calls,
// we should really attempt to kill these somehow (TODO).
switch (sched_state) {
case SCHED_RUNNING:
break;
case SCHED_INTERRUPTING:
debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
/* scheduleDoGC() deletes all the threads */
scheduleDoGC(&cap,task,true);
// after scheduleDoGC(), we must be shutting down. Either some
// other Capability did the final GC, or we did it above,
// either way we can fall through to the SCHED_SHUTTING_DOWN
// case now.
ASSERT(sched_state == SCHED_SHUTTING_DOWN);
// fall through
case SCHED_SHUTTING_DOWN:
debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
if (!isBoundTask(task) && emptyRunQueue(cap)) {
return cap;
}
break;
default:
barf("sched_state: %" FMT_Word, sched_state);
}
scheduleFindWork(&cap);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
scheduleDetectDeadlock(&cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
// scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
// Additionally, it is not fatal for the
// threaded RTS to reach here with no threads to run.
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
#if defined(THREADED_RTS)
scheduleYield(&cap,task);
if (emptyRunQueue(cap)) continue; // look for work again
#endif
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
if ( emptyRunQueue(cap) ) {
ASSERT(sched_state >= SCHED_INTERRUPTING);
}
#endif
//
// Get a thread to run
//
t = popRunQueue(cap);
// Sanity check the thread we're about to run. This can be
// expensive if there is lots of thread switching going on...
IF_DEBUG(sanity,checkTSO(t));
#if defined(THREADED_RTS)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
InCall *bound = t->bound;
if (bound) {
if (bound->task == task) {
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
"thread %lu bound to another OS thread",
(unsigned long)t->id);
// no, bound to a different Haskell thread: pass to that thread
pushOnRunQueue(cap,t);
continue;
}
} else {
// The thread we want to run is unbound.
if (task->incall->tso) {
debugTrace(DEBUG_sched,
"this OS thread cannot run thread %lu",
(unsigned long)t->id);
// no, the current native thread is bound to a different
// Haskell thread, so pass it to any worker thread
pushOnRunQueue(cap,t);
continue;
}
}
}
#endif
// If we're shutting down, and this thread has not yet been
// killed, kill it now. This sometimes happens when a finalizer
// thread is created by the final GC, or a thread previously
// in a foreign call returns.
if (sched_state >= SCHED_INTERRUPTING &&
!(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
deleteThread(t);
}
// If this capability is disabled, migrate the thread away rather
// than running it. NB. but not if the thread is bound: it is
// really hard for a bound thread to migrate itself. Believe me,
// I tried several ways and couldn't find a way to do it.
// Instead, when everything is stopped for GC, we migrate all the
// threads on the run queue then (see scheduleDoGC()).
//
// ToDo: what about TSO_LOCKED? Currently we're migrating those
// when the number of capabilities drops, but we never migrate
// them back if it rises again. Presumably we should, but after
// the thread has been migrated we no longer know what capability
// it was originally on.
#if defined(THREADED_RTS)
if (cap->disabled && !t->bound) {
Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
migrateThread(cap, t, dest_cap);
continue;
}
#endif
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
*/
if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
&& !emptyThreadQueues(cap)) {
cap->context_switch = 1;
}
run_thread:
// CurrentTSO is the thread to run. It might be different if we
// loop back to run_thread, so make sure to set CurrentTSO after
// that.
cap->r.rCurrentTSO = t;
startHeapProfTimer();
// ----------------------------------------------------------------------
// Run the current thread
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
ASSERT(t->bound ? t->bound->task->cap == cap : 1);
prev_what_next = t->what_next;
errno = t->saved_errno;
#if defined(mingw32_HOST_OS)
SetLastError(t->saved_winerror);
#endif
// reset the interrupt flag before running Haskell code
cap->interrupt = 0;
cap->in_haskell = true;
cap->idle = 0;
dirty_TSO(cap,t);
dirty_STACK(cap,t->stackobj);
switch (recent_activity)
{
case ACTIVITY_DONE_GC: {
// ACTIVITY_DONE_GC means we turned off the timer signal to
// conserve power (see #1623). Re-enable it here.
uint32_t prev;
prev = xchg((P_)&recent_activity, ACTIVITY_YES);
if (prev == ACTIVITY_DONE_GC) {
#if !defined(PROFILING)
startTimer();
#endif
}
break;
}
case ACTIVITY_INACTIVE:
// If we reached ACTIVITY_INACTIVE, then don't reset it until
// we've done the GC. The thread running here might just be
// the IO manager thread that handle_tick() woke up via
// wakeUpRts().
break;
default:
recent_activity = ACTIVITY_YES;
}
traceEventRunThread(cap, t);
switch (prev_what_next) {
case ThreadKilled:
case ThreadComplete:
/* Thread already finished, return to scheduler. */
ret = ThreadFinished;
break;
case ThreadRunGHC:
{
StgRegTable *r;
r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
cap = regTableToCapability(r);
ret = r->rRet;
break;
}
case ThreadInterpret:
cap = interpretBCO(cap);
ret = cap->r.rRet;
break;
default:
barf("schedule: invalid prev_what_next=%u field", prev_what_next);
}
cap->in_haskell = false;
// The TSO might have moved, eg. if it re-entered the RTS and a GC
// happened. So find the new location:
t = cap->r.rCurrentTSO;
// cap->r.rCurrentTSO is charged for calls to allocate(), so we
// don't want it set when not running a Haskell thread.
cap->r.rCurrentTSO = NULL;
// And save the current errno in this thread.
// XXX: possibly bogus for SMP because this thread might already
// be running again, see code below.
t->saved_errno = errno;
#if defined(mingw32_HOST_OS)
// Similarly for Windows error code
t->saved_winerror = GetLastError();
#endif
if (ret == ThreadBlocked) {
if (t->why_blocked == BlockedOnBlackHole) {
StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
traceEventStopThread(cap, t, t->why_blocked + 6,
owner != NULL ? owner->id : 0);
} else {
traceEventStopThread(cap, t, t->why_blocked + 6, 0);
}
} else {
if (ret == StackOverflow) {
traceEventStopThread(cap, t, ret, t->tot_stack_size);
} else {
traceEventStopThread(cap, t, ret, 0);
}
}
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
// ----------------------------------------------------------------------
// Costs for the scheduler are assigned to CCS_SYSTEM
stopHeapProfTimer();
#if defined(PROFILING)
cap->r.rCCCS = CCS_SYSTEM;
#endif
schedulePostRunThread(cap,t);
ready_to_gc = false;
switch (ret) {
case HeapOverflow:
ready_to_gc = scheduleHandleHeapOverflow(cap,t);
break;
case StackOverflow:
// just adjust the stack for this thread, then pop it back
// on the run queue.
threadStackOverflow(cap, t);
pushOnRunQueue(cap,t);
break;
case ThreadYielding:
if (scheduleHandleYield(cap, t, prev_what_next)) {
// shortcut for switching between compiler/interpreter:
goto run_thread;
}
break;
case ThreadBlocked:
scheduleHandleThreadBlocked(t);
break;
case ThreadFinished:
if (scheduleHandleThreadFinished(cap, task, t)) return cap;
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
break;
default:
barf("schedule: invalid thread return code %d", (int)ret);
}
if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
scheduleDoGC(&cap,task,false);
}
} /* end of while() */
}