uint32_t messageBlackHole()

in rts/Messages.c [164:310]


uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
{
    const StgInfoTable *info;
    StgClosure *p;
    StgBlockingQueue *bq;
    StgClosure *bh = UNTAG_CLOSURE(msg->bh);
    StgTSO *owner;

    debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
                  "blackhole %p", (W_)msg->tso->id, msg->bh);

    info = bh->header.info;

    // If we got this message in our inbox, it might be that the
    // BLACKHOLE has already been updated, and GC has shorted out the
    // indirection, so the pointer no longer points to a BLACKHOLE at
    // all.
    if (info != &stg_BLACKHOLE_info &&
        info != &stg_CAF_BLACKHOLE_info &&
        info != &__stg_EAGER_BLACKHOLE_info &&
        info != &stg_WHITEHOLE_info) {
        // if it is a WHITEHOLE, then a thread is in the process of
        // trying to BLACKHOLE it.  But we know that it was once a
        // BLACKHOLE, so there is at least a valid pointer in the
        // payload, so we can carry on.
        return 0;
    }

    // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
    // or a value.
loop:
    // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
    // and turns this into an infinite loop.
    p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
    info = p->header.info;

    if (info == &stg_IND_info)
    {
        // This could happen, if e.g. we got a BLOCKING_QUEUE that has
        // just been replaced with an IND by another thread in
        // updateThunk().  In which case, if we read the indirectee
        // again we should get the value.
        // See Note [BLACKHOLE pointing to IND] in sm/Evac.c
        goto loop;
    }

    else if (info == &stg_TSO_info)
    {
        owner = (StgTSO*)p;

#if defined(THREADED_RTS)
        if (owner->cap != cap) {
            sendMessage(cap, owner->cap, (Message*)msg);
            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                          owner->cap->no);
            return 1;
        }
#endif
        // owner is the owner of the BLACKHOLE, and resides on this
        // Capability.  msg->tso is the first thread to block on this
        // BLACKHOLE, so we first create a BLOCKING_QUEUE object.

        bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));

        // initialise the BLOCKING_QUEUE object
        SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
        bq->bh = bh;
        bq->queue = msg;
        bq->owner = owner;

        msg->link = (MessageBlackHole*)END_TSO_QUEUE;

        // All BLOCKING_QUEUES are linked in a list on owner->bq, so
        // that we can search through them in the event that there is
        // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
        // becomes orphaned (see updateThunk()).
        bq->link = owner->bq;
        owner->bq = bq;
        dirty_TSO(cap, owner); // we modified owner->bq

        // If the owner of the blackhole is currently runnable, then
        // bump it to the front of the run queue.  This gives the
        // blocked-on thread a little boost which should help unblock
        // this thread, and may avoid a pile-up of other threads
        // becoming blocked on the same BLACKHOLE (#3838).
        //
        // NB. we check to make sure that the owner is not the same as
        // the current thread, since in that case it will not be on
        // the run queue.
        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
            promoteInRunQueue(cap, owner);
        }

        // point to the BLOCKING_QUEUE from the BLACKHOLE
        write_barrier(); // make the BQ visible
        ((StgInd*)bh)->indirectee = (StgClosure *)bq;
        recordClosureMutated(cap,bh); // bh was mutated

        debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
                      (W_)msg->tso->id, (W_)owner->id);

        return 1; // blocked
    }
    else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
             info == &stg_BLOCKING_QUEUE_DIRTY_info)
    {
        StgBlockingQueue *bq = (StgBlockingQueue *)p;

        ASSERT(bq->bh == bh);

        owner = bq->owner;

        ASSERT(owner != END_TSO_QUEUE);

#if defined(THREADED_RTS)
        if (owner->cap != cap) {
            sendMessage(cap, owner->cap, (Message*)msg);
            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                          owner->cap->no);
            return 1;
        }
#endif

        msg->link = bq->queue;
        bq->queue = msg;
        recordClosureMutated(cap,(StgClosure*)msg);

        if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
            bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
            recordClosureMutated(cap,(StgClosure*)bq);
        }

        debugTraceCap(DEBUG_sched, cap,
                      "thread %d blocked on existing BLOCKING_QUEUE "
                      "owned by thread %d",
                      (W_)msg->tso->id, (W_)owner->id);

        // See above, #3838
        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
            promoteInRunQueue(cap, owner);
        }

        return 1; // blocked
    }

    return 0; // not blocked
}