static bool requestSync()

in rts/Schedule.c [130:553]


static bool requestSync (Capability **pcap, Task *task,
                         PendingSync *sync_type, SyncType *prev_sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
static void releaseAllCapabilities(uint32_t n, Capability *cap, Task *task);
static void startWorkerTasks (uint32_t from USED_IF_THREADS,
                              uint32_t to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability **cap);
static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
static bool scheduleHandleYield( Capability *cap, StgTSO *t,
                                 uint32_t prev_what_next );
static void scheduleHandleThreadBlocked( StgTSO *t );
static bool scheduleHandleThreadFinished( Capability *cap, Task *task,
                                          StgTSO *t );
static bool scheduleNeedHeapProfile(bool ready_to_gc);
static void scheduleDoGC(Capability **pcap, Task *task, bool force_major);

static void deleteThread (StgTSO *tso);
static void deleteAllThreads (void);

#if defined(FORKPROCESS_PRIMOP_SUPPORTED)
static void deleteThread_(StgTSO *tso);
#endif

/* ---------------------------------------------------------------------------
   Main scheduling loop.

   We use round-robin scheduling, each thread returning to the
   scheduler loop when one of these conditions is detected:

      * out of heap space
      * timer expires (thread yields)
      * thread blocks
      * thread ends
      * stack overflow

   ------------------------------------------------------------------------ */

static Capability *
schedule (Capability *initialCapability, Task *task)
{
  StgTSO *t;
  Capability *cap;
  StgThreadReturnCode ret;
  uint32_t prev_what_next;
  bool ready_to_gc;

  cap = initialCapability;

  // Pre-condition: this task owns initialCapability.
  // The sched_mutex is *NOT* held
  // NB. on return, we still hold a capability.

  debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);

  schedulePreLoop();

  // -----------------------------------------------------------
  // Scheduler loop starts here:

  while (1) {

    // Check whether we have re-entered the RTS from Haskell without
    // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
    // call).
    if (cap->in_haskell) {
          errorBelch("schedule: re-entered unsafely.\n"
                     "   Perhaps a 'foreign import unsafe' should be 'safe'?");
          stg_exit(EXIT_FAILURE);
    }

    // Note [shutdown]: The interruption / shutdown sequence.
    //
    // In order to cleanly shut down the runtime, we want to:
    //   * make sure that all main threads return to their callers
    //     with the state 'Interrupted'.
    //   * clean up all OS threads assocated with the runtime
    //   * free all memory etc.
    //
    // So the sequence goes like this:
    //
    //   * The shutdown sequence is initiated by calling hs_exit(),
    //     interruptStgRts(), or running out of memory in the GC.
    //
    //   * Set sched_state = SCHED_INTERRUPTING
    //
    //   * The scheduler notices sched_state = SCHED_INTERRUPTING and calls
    //     scheduleDoGC(), which halts the whole runtime by acquiring all the
    //     capabilities, does a GC and then calls deleteAllThreads() to kill all
    //     the remaining threads.  The zombies are left on the run queue for
    //     cleaning up.  We can't kill threads involved in foreign calls.
    //
    //   * scheduleDoGC() sets sched_state = SCHED_SHUTTING_DOWN
    //
    //   * After this point, there can be NO MORE HASKELL EXECUTION.  This is
    //     enforced by the scheduler, which won't run any Haskell code when
    //     sched_state >= SCHED_INTERRUPTING, and we already sync'd with the
    //     other capabilities by doing the GC earlier.
    //
    //   * all workers exit when the run queue on their capability
    //     drains.  All main threads will also exit when their TSO
    //     reaches the head of the run queue and they can return.
    //
    //   * eventually all Capabilities will shut down, and the RTS can
    //     exit.
    //
    //   * We might be left with threads blocked in foreign calls,
    //     we should really attempt to kill these somehow (TODO).

    switch (sched_state) {
    case SCHED_RUNNING:
        break;
    case SCHED_INTERRUPTING:
        debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
        /* scheduleDoGC() deletes all the threads */
        scheduleDoGC(&cap,task,true);

        // after scheduleDoGC(), we must be shutting down.  Either some
        // other Capability did the final GC, or we did it above,
        // either way we can fall through to the SCHED_SHUTTING_DOWN
        // case now.
        ASSERT(sched_state == SCHED_SHUTTING_DOWN);
        // fall through

    case SCHED_SHUTTING_DOWN:
        debugTrace(DEBUG_sched, "SCHED_SHUTTING_DOWN");
        // If we are a worker, just exit.  If we're a bound thread
        // then we will exit below when we've removed our TSO from
        // the run queue.
        if (!isBoundTask(task) && emptyRunQueue(cap)) {
            return cap;
        }
        break;
    default:
        barf("sched_state: %" FMT_Word, sched_state);
    }

    scheduleFindWork(&cap);

    /* work pushing, currently relevant only for THREADED_RTS:
       (pushes threads, wakes up idle capabilities for stealing) */
    schedulePushWork(cap,task);

    scheduleDetectDeadlock(&cap,task);

    // Normally, the only way we can get here with no threads to
    // run is if a keyboard interrupt received during
    // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
    // Additionally, it is not fatal for the
    // threaded RTS to reach here with no threads to run.
    //
    // win32: might be here due to awaitEvent() being abandoned
    // as a result of a console event having been delivered.

#if defined(THREADED_RTS)
    scheduleYield(&cap,task);

    if (emptyRunQueue(cap)) continue; // look for work again
#endif

#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
    if ( emptyRunQueue(cap) ) {
        ASSERT(sched_state >= SCHED_INTERRUPTING);
    }
#endif

    //
    // Get a thread to run
    //
    t = popRunQueue(cap);

    // Sanity check the thread we're about to run.  This can be
    // expensive if there is lots of thread switching going on...
    IF_DEBUG(sanity,checkTSO(t));

#if defined(THREADED_RTS)
    // Check whether we can run this thread in the current task.
    // If not, we have to pass our capability to the right task.
    {
        InCall *bound = t->bound;

        if (bound) {
            if (bound->task == task) {
                // yes, the Haskell thread is bound to the current native thread
            } else {
                debugTrace(DEBUG_sched,
                           "thread %lu bound to another OS thread",
                           (unsigned long)t->id);
                // no, bound to a different Haskell thread: pass to that thread
                pushOnRunQueue(cap,t);
                continue;
            }
        } else {
            // The thread we want to run is unbound.
            if (task->incall->tso) {
                debugTrace(DEBUG_sched,
                           "this OS thread cannot run thread %lu",
                           (unsigned long)t->id);
                // no, the current native thread is bound to a different
                // Haskell thread, so pass it to any worker thread
                pushOnRunQueue(cap,t);
                continue;
            }
        }
    }
#endif

    // If we're shutting down, and this thread has not yet been
    // killed, kill it now.  This sometimes happens when a finalizer
    // thread is created by the final GC, or a thread previously
    // in a foreign call returns.
    if (sched_state >= SCHED_INTERRUPTING &&
        !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
        deleteThread(t);
    }

    // If this capability is disabled, migrate the thread away rather
    // than running it.  NB. but not if the thread is bound: it is
    // really hard for a bound thread to migrate itself.  Believe me,
    // I tried several ways and couldn't find a way to do it.
    // Instead, when everything is stopped for GC, we migrate all the
    // threads on the run queue then (see scheduleDoGC()).
    //
    // ToDo: what about TSO_LOCKED?  Currently we're migrating those
    // when the number of capabilities drops, but we never migrate
    // them back if it rises again.  Presumably we should, but after
    // the thread has been migrated we no longer know what capability
    // it was originally on.
#if defined(THREADED_RTS)
    if (cap->disabled && !t->bound) {
        Capability *dest_cap = capabilities[cap->no % enabled_capabilities];
        migrateThread(cap, t, dest_cap);
        continue;
    }
#endif

    /* context switches are initiated by the timer signal, unless
     * the user specified "context switch as often as possible", with
     * +RTS -C0
     */
    if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
        && !emptyThreadQueues(cap)) {
        cap->context_switch = 1;
    }

run_thread:

    // CurrentTSO is the thread to run. It might be different if we
    // loop back to run_thread, so make sure to set CurrentTSO after
    // that.
    cap->r.rCurrentTSO = t;

    startHeapProfTimer();

    // ----------------------------------------------------------------------
    // Run the current thread

    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    ASSERT(t->cap == cap);
    ASSERT(t->bound ? t->bound->task->cap == cap : 1);

    prev_what_next = t->what_next;

    errno = t->saved_errno;
#if defined(mingw32_HOST_OS)
    SetLastError(t->saved_winerror);
#endif

    // reset the interrupt flag before running Haskell code
    cap->interrupt = 0;

    cap->in_haskell = true;
    cap->idle = 0;

    dirty_TSO(cap,t);
    dirty_STACK(cap,t->stackobj);

    switch (recent_activity)
    {
    case ACTIVITY_DONE_GC: {
        // ACTIVITY_DONE_GC means we turned off the timer signal to
        // conserve power (see #1623).  Re-enable it here.
        uint32_t prev;
        prev = xchg((P_)&recent_activity, ACTIVITY_YES);
        if (prev == ACTIVITY_DONE_GC) {
#if !defined(PROFILING)
            startTimer();
#endif
        }
        break;
    }
    case ACTIVITY_INACTIVE:
        // If we reached ACTIVITY_INACTIVE, then don't reset it until
        // we've done the GC.  The thread running here might just be
        // the IO manager thread that handle_tick() woke up via
        // wakeUpRts().
        break;
    default:
        recent_activity = ACTIVITY_YES;
    }

    traceEventRunThread(cap, t);

    switch (prev_what_next) {

    case ThreadKilled:
    case ThreadComplete:
        /* Thread already finished, return to scheduler. */
        ret = ThreadFinished;
        break;

    case ThreadRunGHC:
    {
        StgRegTable *r;
        r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
        cap = regTableToCapability(r);
        ret = r->rRet;
        break;
    }

    case ThreadInterpret:
        cap = interpretBCO(cap);
        ret = cap->r.rRet;
        break;

    default:
        barf("schedule: invalid prev_what_next=%u field", prev_what_next);
    }

    cap->in_haskell = false;

    // The TSO might have moved, eg. if it re-entered the RTS and a GC
    // happened.  So find the new location:
    t = cap->r.rCurrentTSO;

    // cap->r.rCurrentTSO is charged for calls to allocate(), so we
    // don't want it set when not running a Haskell thread.
    cap->r.rCurrentTSO = NULL;

    // And save the current errno in this thread.
    // XXX: possibly bogus for SMP because this thread might already
    // be running again, see code below.
    t->saved_errno = errno;
#if defined(mingw32_HOST_OS)
    // Similarly for Windows error code
    t->saved_winerror = GetLastError();
#endif

    if (ret == ThreadBlocked) {
        if (t->why_blocked == BlockedOnBlackHole) {
            StgTSO *owner = blackHoleOwner(t->block_info.bh->bh);
            traceEventStopThread(cap, t, t->why_blocked + 6,
                                 owner != NULL ? owner->id : 0);
        } else {
            traceEventStopThread(cap, t, t->why_blocked + 6, 0);
        }
    } else {
        if (ret == StackOverflow) {
          traceEventStopThread(cap, t, ret, t->tot_stack_size);
        } else {
          traceEventStopThread(cap, t, ret, 0);
        }
    }

    ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
    ASSERT(t->cap == cap);

    // ----------------------------------------------------------------------

    // Costs for the scheduler are assigned to CCS_SYSTEM
    stopHeapProfTimer();
#if defined(PROFILING)
    cap->r.rCCCS = CCS_SYSTEM;
#endif

    schedulePostRunThread(cap,t);

    ready_to_gc = false;

    switch (ret) {
    case HeapOverflow:
        ready_to_gc = scheduleHandleHeapOverflow(cap,t);
        break;

    case StackOverflow:
        // just adjust the stack for this thread, then pop it back
        // on the run queue.
        threadStackOverflow(cap, t);
        pushOnRunQueue(cap,t);
        break;

    case ThreadYielding:
        if (scheduleHandleYield(cap, t, prev_what_next)) {
            // shortcut for switching between compiler/interpreter:
            goto run_thread;
        }
        break;

    case ThreadBlocked:
        scheduleHandleThreadBlocked(t);
        break;

    case ThreadFinished:
        if (scheduleHandleThreadFinished(cap, task, t)) return cap;
        ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
        break;

    default:
      barf("schedule: invalid thread return code %d", (int)ret);
    }

    if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
      scheduleDoGC(&cap,task,false);
    }
  } /* end of while() */
}