Sint transcode_dist_obuf()

in erts/emulator/beam/external.c [5720:6098]


Sint transcode_dist_obuf(ErtsDistOutputBuf* ob,
                         DistEntry* dep,
                         Uint64 dflags,
                         Sint reds)
{
    ErlIOVec* eiov = ob->eiov;
    SysIOVec* iov = eiov->iov;
    byte *hdr;
    Uint64 hopefull_flags;
    Uint32 hopefull_ix, payload_ix;
    Sint start_r, r;
    Uint new_len;
    byte *ep;

    if (reds < 0)
        return reds;

    /*
     * HOPEFUL_DATA header always present in io vector
     * element 1:
     *
     * +---+--------------+-----------+----------+
     * |'H'|Hopefull Flags|Hopefull IX|Payload IX|
     * +---+--------------+-----------+----------+
     *   1         8            4          4
     *
     * Hopefull flags: Flags corresponding to actual
     *                 hopefull encodings in this
     *                 buffer.
     * Hopefull IX:    Vector index of first hopefull
     *                 encoding. Each hopefull encoding
     *                 is preceeded by 4 bytes containing
     *                 next vector index of hopefull
     *                 encoding. ERTS_NO_HIX marks the
     *                 end.
     * Payload IX:     Vector index of the beginning
     *                 of the payload if there is
     *                 one; otherwise, zero.
     */
    hdr = (byte *) iov[1].iov_base;

    ASSERT(HOPEFUL_DATA == *((byte *)iov[1].iov_base));
    ASSERT(iov[1].iov_len == 1+8+4+4);
    
    /* Control message always begin in vector element 2 */
    ep = iov[2].iov_base;
    ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT);

    if (~dflags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)
        && ep[0] == SMALL_TUPLE_EXT
        && ep[1] == 4
        && ep[2] == SMALL_INTEGER_EXT
        && (ep[3] == DOP_MONITOR_P ||
            ep[3] == DOP_MONITOR_P_EXIT ||
            ep[3] == DOP_DEMONITOR_P)) {
        /*
         * Receiver does not support process monitoring.
         * Suppress monitor control msg (see erts_dsig_send_monitor)
         * by converting it to an empty (tick) packet.
         */
        int i;
        for (i = 1; i < ob->eiov->vsize; i++) {
            if (ob->eiov->binv[i])
                driver_free_binary(ob->eiov->binv[i]);
        }
        ob->eiov->vsize = 1;
        ob->eiov->size = 0;
        return reds;
    }

    hdr++;
    hopefull_flags = get_int64(hdr);

    hdr += 8;
    hopefull_ix = get_int32(hdr);

    if ((~dflags & DFLAG_SPAWN)
        && ep[0] == SMALL_TUPLE_EXT
        && ((ep[1] == 6
             && ep[2] == SMALL_INTEGER_EXT
             && ep[3] == DOP_SPAWN_REQUEST)
            || (ep[1] == 8
                && ep[2] == SMALL_INTEGER_EXT
                && ep[3] == DOP_SPAWN_REQUEST_TT))) {
        /*
         * Receiver does not support distributed spawn. Convert
         * this packet to an empty (tick) packet, and inform
         * spawning process that this is not supported...
         */
        ErtsHeapFactory factory;
        Eterm ctl_msg, ref, pid, token, *tp, *hp;
        Uint buf_sz;
        byte *buf_start, *buf_end;
        const byte *ptr;
        Uint hsz;
        int i;

        hdr += 4;
        payload_ix = get_int32(hdr);
        ASSERT(payload_ix >= 3);

        if (payload_ix == 3) {
            /* The whole control message is in iov[2].iov_base */
            buf_sz = (Uint) iov[2].iov_len;
            buf_start = (byte *) iov[2].iov_base;
            buf_end = buf_start + buf_sz;
        }
        else {
            /* Control message over multiple buffers... */
            int ix;
            buf_sz = 0;
            for (ix = 2; ix < payload_ix; ix++)
                buf_sz += iov[ix].iov_len;
            ptr = buf_start = erts_alloc(ERTS_ALC_T_TMP, buf_sz);
            buf_end = buf_start + buf_sz;
            for (ix = 2; ix < payload_ix; ix++) {
                sys_memcpy((void *) ptr,
                           (void *) iov[ix].iov_base,
                           iov[ix].iov_len);
                ptr += iov[ix].iov_len;
            }
        }

        hsz = decoded_size(buf_start, buf_end, 0, NULL);
        hp = erts_alloc(ERTS_ALC_T_TMP, hsz*sizeof(Eterm));
        erts_factory_tmp_init(&factory, hp, hsz, ERTS_ALC_T_TMP);
            
        ptr = dec_term(NULL, &factory, buf_start, &ctl_msg, NULL, 0);
        ASSERT(ptr); (void)ptr;

        ASSERT(is_tuple_arity(ctl_msg, 6)
               || is_tuple_arity(ctl_msg, 8));
        tp = tuple_val(ctl_msg);
        ASSERT(tp[1] == make_small(DOP_SPAWN_REQUEST)
               || tp[1] == make_small(DOP_SPAWN_REQUEST_TT));

        ref = tp[2];
        pid = tp[3];
        if (tp[1] == make_small(DOP_SPAWN_REQUEST))
            token = NIL;
        else {
            token = tp[8];
            erts_seq_trace_update_node_token(token);
        }
        ASSERT(is_internal_ordinary_ref(tp[2]));
        ASSERT(is_internal_pid(tp[3]));
        
        (void) erts_proc_sig_send_dist_spawn_reply(dep->sysname,
                                                   ref, pid,
                                                   NULL, am_notsup,
                                                   token);

        erts_factory_close(&factory);
        erts_free(ERTS_ALC_T_TMP, hp);
        if (buf_start != (byte *) iov[2].iov_base)
            erts_free(ERTS_ALC_T_TMP, buf_start);

        for (i = 1; i < ob->eiov->vsize; i++) {
            if (ob->eiov->binv[i])
                driver_free_binary(ob->eiov->binv[i]);
        }
        ob->eiov->vsize = 1;
        ob->eiov->size = 0;
        
        reds -= 4;
        
        if (reds < 0)
            return 0;
        return reds;
    }
    
    start_r = r = reds*ERTS_TRANSCODE_REDS_FACT;

    if (~dflags & hopefull_flags) {

        while (hopefull_ix != ERTS_NO_HIX) {
            Uint32 new_hopefull_ix;
            
            if (r <= 0) { /* yield... */
                /* save current hopefull_ix... */
                ep = (byte *) iov[1].iov_base;
                ep += 5;
                put_int32(hopefull_ix, ep);
                return -1;
            }

            /* Read next hopefull index */
            ep = (byte *) iov[hopefull_ix].iov_base;
            ep -= 4;
            new_hopefull_ix = get_int32(ep);
            ASSERT(new_hopefull_ix == ERTS_NO_HIX
                   || (hopefull_ix < new_hopefull_ix
                       && new_hopefull_ix < eiov->vsize));
            
            ep = (byte *) iov[hopefull_ix].iov_base;
            switch (*ep) {

            case EXPORT_EXT: {
                byte *start_ep, *end_ep;
                Eterm module, function;
                if (!(hopefull_flags & DFLAG_EXPORT_PTR_TAG))
                    break;
                /* Read original encoding... */
                ep++;
                start_ep = ep;
                ep = (byte*)dec_atom(NULL, ep, &module);
                ASSERT(ep && is_atom(module));
                ep = (byte*)dec_atom(NULL, ep, &function);
                ASSERT(ep && is_atom(function));
                end_ep = ep;
                ASSERT(*ep == SMALL_INTEGER_EXT
                       || *ep == INTEGER_EXT
                       || *ep == SMALL_BIG_EXT
                       || *ep == LARGE_BIG_EXT);

                /*
                 * module and function atoms are encoded
                 * between start_ep and end_ep. Prepend a
                 * 2-tuple tag before the atoms and
                 * remove arity at end.
                 */

                /* write fallback */

                ep = start_ep;
                ep--;
                put_int8(2, ep);
                ep--;
                *ep = SMALL_TUPLE_EXT;

                iov[hopefull_ix].iov_base = ep;

                /* Update iov sizes... */
                new_len = end_ep - ep;
                eiov->size -= iov[hopefull_ix].iov_len;
                eiov->size += new_len;
                iov[hopefull_ix].iov_len = new_len;
                r--;
                break;
            }

            case BIT_BINARY_EXT: {
                Uint bin_sz;
                byte bitsize, epilog_byte;
                ASSERT(hopefull_ix != ERTS_NO_HIX);
                if (!(hopefull_flags & DFLAG_BIT_BINARIES)) {
                    /* skip to epilog... */
                    hopefull_ix = new_hopefull_ix;
                    ep = (byte *) iov[hopefull_ix].iov_base;
                    ep -= 4;
                    new_hopefull_ix = get_int32(ep);
                    ASSERT(new_hopefull_ix == ERTS_NO_HIX
                           || (hopefull_ix < new_hopefull_ix
                               && new_hopefull_ix < eiov->vsize));
                    break;
                }

                /* read original encoded prolog... */
                ep++;
                bin_sz = get_uint32(ep);
                ep += 4;
                bitsize = *ep++;

                /* write fallback prolog... */
                iov[hopefull_ix].iov_base -= 4;
                ep = (byte *) iov[hopefull_ix].iov_base;

                *ep++ = SMALL_TUPLE_EXT;
                *ep++ = 2;
                *ep++ = BINARY_EXT;
                put_int32(bin_sz, ep);
                ep += 4;

                /* Update iov sizes... */
                new_len = ep - (byte *) iov[hopefull_ix].iov_base;
                eiov->size -= iov[hopefull_ix].iov_len;
                eiov->size += new_len;
                iov[hopefull_ix].iov_len = new_len;
                r--;
#ifdef DEBUG
                /*
                 * The binary data between the prolog and the
                 * epilog should be of size 'bin_sz - 1' and
                 * exists in the iov elements between prolog
                 * and epilog...
                 */
                {
                    Uint ix, debug_bin_sz = 0;
                    for (ix = hopefull_ix+1; ix < new_hopefull_ix; ix++)
                        debug_bin_sz += iov[ix].iov_len;
                    ASSERT(debug_bin_sz == bin_sz - 1);
                }
#endif
                /* jump to epilog... */
                hopefull_ix = new_hopefull_ix;
                ep = (byte *) iov[hopefull_ix].iov_base;

                /* read original encoded epilog... */
                epilog_byte = *ep;

                ASSERT(1 == iov[hopefull_ix].iov_len);

                iov[hopefull_ix].iov_base -= 4;
                ep = (byte *) iov[hopefull_ix].iov_base;
                new_hopefull_ix = get_int32(ep);
                ASSERT(new_hopefull_ix == ERTS_NO_HIX
                       || (hopefull_ix < new_hopefull_ix
                           && new_hopefull_ix < eiov->vsize));
                
                /* write fallback epilog... */

                *ep++ = epilog_byte;
                *ep++ = SMALL_INTEGER_EXT;
                *ep++ = bitsize;

                /* Update iov sizes... */
                new_len = ep - (byte *) iov[hopefull_ix].iov_base;
                eiov->size -= iov[hopefull_ix].iov_len;
                eiov->size += new_len;
                iov[hopefull_ix].iov_len = new_len;
                r--;
                break;
            }

            default:
                ERTS_INTERNAL_ERROR("Unexpected external tag");
                break;
            }

            hopefull_ix = new_hopefull_ix;
            r--;
        }
    }

    /*
     * Replace hopefull data header with actual header...
     */
    ep = (byte *) iov[1].iov_base;
    eiov->size -= iov[1].iov_len;

    if (dflags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS)) {
        /*
         * Encoding was done without atom caching but receiver expects
         * a dist header, so we prepend an empty one.
         */
        *ep++ = VERSION_MAGIC;
        *ep++ = DIST_HEADER;
        *ep++ = 0; /* NumberOfAtomCacheRefs */
    }
    else {
        hdr += 4;
        payload_ix = get_int32(hdr);

        if (payload_ix) {
            ASSERT(0 < payload_ix && payload_ix < eiov->vsize);
            /* Prepend version magic on payload. */
            iov[payload_ix].iov_base--;
            *((byte *) iov[payload_ix].iov_base) = VERSION_MAGIC;
            iov[payload_ix].iov_len++;
            eiov->size++;
            r--;
        }

        *ep++ = PASS_THROUGH;
        *ep++ = VERSION_MAGIC;
    }

    iov[1].iov_len = ep - (byte *) iov[1].iov_base;
    eiov->size += iov[1].iov_len;

    r--;

    /* done... */

    reds -= (start_r - r)/ERTS_TRANSCODE_REDS_FACT + 1;
    if (reds < 0)
        return 0;
    return reds;
}