in erts/emulator/beam/external.c [5720:6098]
Sint transcode_dist_obuf(ErtsDistOutputBuf* ob,
DistEntry* dep,
Uint64 dflags,
Sint reds)
{
ErlIOVec* eiov = ob->eiov;
SysIOVec* iov = eiov->iov;
byte *hdr;
Uint64 hopefull_flags;
Uint32 hopefull_ix, payload_ix;
Sint start_r, r;
Uint new_len;
byte *ep;
if (reds < 0)
return reds;
/*
* HOPEFUL_DATA header always present in io vector
* element 1:
*
* +---+--------------+-----------+----------+
* |'H'|Hopefull Flags|Hopefull IX|Payload IX|
* +---+--------------+-----------+----------+
* 1 8 4 4
*
* Hopefull flags: Flags corresponding to actual
* hopefull encodings in this
* buffer.
* Hopefull IX: Vector index of first hopefull
* encoding. Each hopefull encoding
* is preceeded by 4 bytes containing
* next vector index of hopefull
* encoding. ERTS_NO_HIX marks the
* end.
* Payload IX: Vector index of the beginning
* of the payload if there is
* one; otherwise, zero.
*/
hdr = (byte *) iov[1].iov_base;
ASSERT(HOPEFUL_DATA == *((byte *)iov[1].iov_base));
ASSERT(iov[1].iov_len == 1+8+4+4);
/* Control message always begin in vector element 2 */
ep = iov[2].iov_base;
ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT);
if (~dflags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)
&& ep[0] == SMALL_TUPLE_EXT
&& ep[1] == 4
&& ep[2] == SMALL_INTEGER_EXT
&& (ep[3] == DOP_MONITOR_P ||
ep[3] == DOP_MONITOR_P_EXIT ||
ep[3] == DOP_DEMONITOR_P)) {
/*
* Receiver does not support process monitoring.
* Suppress monitor control msg (see erts_dsig_send_monitor)
* by converting it to an empty (tick) packet.
*/
int i;
for (i = 1; i < ob->eiov->vsize; i++) {
if (ob->eiov->binv[i])
driver_free_binary(ob->eiov->binv[i]);
}
ob->eiov->vsize = 1;
ob->eiov->size = 0;
return reds;
}
hdr++;
hopefull_flags = get_int64(hdr);
hdr += 8;
hopefull_ix = get_int32(hdr);
if ((~dflags & DFLAG_SPAWN)
&& ep[0] == SMALL_TUPLE_EXT
&& ((ep[1] == 6
&& ep[2] == SMALL_INTEGER_EXT
&& ep[3] == DOP_SPAWN_REQUEST)
|| (ep[1] == 8
&& ep[2] == SMALL_INTEGER_EXT
&& ep[3] == DOP_SPAWN_REQUEST_TT))) {
/*
* Receiver does not support distributed spawn. Convert
* this packet to an empty (tick) packet, and inform
* spawning process that this is not supported...
*/
ErtsHeapFactory factory;
Eterm ctl_msg, ref, pid, token, *tp, *hp;
Uint buf_sz;
byte *buf_start, *buf_end;
const byte *ptr;
Uint hsz;
int i;
hdr += 4;
payload_ix = get_int32(hdr);
ASSERT(payload_ix >= 3);
if (payload_ix == 3) {
/* The whole control message is in iov[2].iov_base */
buf_sz = (Uint) iov[2].iov_len;
buf_start = (byte *) iov[2].iov_base;
buf_end = buf_start + buf_sz;
}
else {
/* Control message over multiple buffers... */
int ix;
buf_sz = 0;
for (ix = 2; ix < payload_ix; ix++)
buf_sz += iov[ix].iov_len;
ptr = buf_start = erts_alloc(ERTS_ALC_T_TMP, buf_sz);
buf_end = buf_start + buf_sz;
for (ix = 2; ix < payload_ix; ix++) {
sys_memcpy((void *) ptr,
(void *) iov[ix].iov_base,
iov[ix].iov_len);
ptr += iov[ix].iov_len;
}
}
hsz = decoded_size(buf_start, buf_end, 0, NULL);
hp = erts_alloc(ERTS_ALC_T_TMP, hsz*sizeof(Eterm));
erts_factory_tmp_init(&factory, hp, hsz, ERTS_ALC_T_TMP);
ptr = dec_term(NULL, &factory, buf_start, &ctl_msg, NULL, 0);
ASSERT(ptr); (void)ptr;
ASSERT(is_tuple_arity(ctl_msg, 6)
|| is_tuple_arity(ctl_msg, 8));
tp = tuple_val(ctl_msg);
ASSERT(tp[1] == make_small(DOP_SPAWN_REQUEST)
|| tp[1] == make_small(DOP_SPAWN_REQUEST_TT));
ref = tp[2];
pid = tp[3];
if (tp[1] == make_small(DOP_SPAWN_REQUEST))
token = NIL;
else {
token = tp[8];
erts_seq_trace_update_node_token(token);
}
ASSERT(is_internal_ordinary_ref(tp[2]));
ASSERT(is_internal_pid(tp[3]));
(void) erts_proc_sig_send_dist_spawn_reply(dep->sysname,
ref, pid,
NULL, am_notsup,
token);
erts_factory_close(&factory);
erts_free(ERTS_ALC_T_TMP, hp);
if (buf_start != (byte *) iov[2].iov_base)
erts_free(ERTS_ALC_T_TMP, buf_start);
for (i = 1; i < ob->eiov->vsize; i++) {
if (ob->eiov->binv[i])
driver_free_binary(ob->eiov->binv[i]);
}
ob->eiov->vsize = 1;
ob->eiov->size = 0;
reds -= 4;
if (reds < 0)
return 0;
return reds;
}
start_r = r = reds*ERTS_TRANSCODE_REDS_FACT;
if (~dflags & hopefull_flags) {
while (hopefull_ix != ERTS_NO_HIX) {
Uint32 new_hopefull_ix;
if (r <= 0) { /* yield... */
/* save current hopefull_ix... */
ep = (byte *) iov[1].iov_base;
ep += 5;
put_int32(hopefull_ix, ep);
return -1;
}
/* Read next hopefull index */
ep = (byte *) iov[hopefull_ix].iov_base;
ep -= 4;
new_hopefull_ix = get_int32(ep);
ASSERT(new_hopefull_ix == ERTS_NO_HIX
|| (hopefull_ix < new_hopefull_ix
&& new_hopefull_ix < eiov->vsize));
ep = (byte *) iov[hopefull_ix].iov_base;
switch (*ep) {
case EXPORT_EXT: {
byte *start_ep, *end_ep;
Eterm module, function;
if (!(hopefull_flags & DFLAG_EXPORT_PTR_TAG))
break;
/* Read original encoding... */
ep++;
start_ep = ep;
ep = (byte*)dec_atom(NULL, ep, &module);
ASSERT(ep && is_atom(module));
ep = (byte*)dec_atom(NULL, ep, &function);
ASSERT(ep && is_atom(function));
end_ep = ep;
ASSERT(*ep == SMALL_INTEGER_EXT
|| *ep == INTEGER_EXT
|| *ep == SMALL_BIG_EXT
|| *ep == LARGE_BIG_EXT);
/*
* module and function atoms are encoded
* between start_ep and end_ep. Prepend a
* 2-tuple tag before the atoms and
* remove arity at end.
*/
/* write fallback */
ep = start_ep;
ep--;
put_int8(2, ep);
ep--;
*ep = SMALL_TUPLE_EXT;
iov[hopefull_ix].iov_base = ep;
/* Update iov sizes... */
new_len = end_ep - ep;
eiov->size -= iov[hopefull_ix].iov_len;
eiov->size += new_len;
iov[hopefull_ix].iov_len = new_len;
r--;
break;
}
case BIT_BINARY_EXT: {
Uint bin_sz;
byte bitsize, epilog_byte;
ASSERT(hopefull_ix != ERTS_NO_HIX);
if (!(hopefull_flags & DFLAG_BIT_BINARIES)) {
/* skip to epilog... */
hopefull_ix = new_hopefull_ix;
ep = (byte *) iov[hopefull_ix].iov_base;
ep -= 4;
new_hopefull_ix = get_int32(ep);
ASSERT(new_hopefull_ix == ERTS_NO_HIX
|| (hopefull_ix < new_hopefull_ix
&& new_hopefull_ix < eiov->vsize));
break;
}
/* read original encoded prolog... */
ep++;
bin_sz = get_uint32(ep);
ep += 4;
bitsize = *ep++;
/* write fallback prolog... */
iov[hopefull_ix].iov_base -= 4;
ep = (byte *) iov[hopefull_ix].iov_base;
*ep++ = SMALL_TUPLE_EXT;
*ep++ = 2;
*ep++ = BINARY_EXT;
put_int32(bin_sz, ep);
ep += 4;
/* Update iov sizes... */
new_len = ep - (byte *) iov[hopefull_ix].iov_base;
eiov->size -= iov[hopefull_ix].iov_len;
eiov->size += new_len;
iov[hopefull_ix].iov_len = new_len;
r--;
#ifdef DEBUG
/*
* The binary data between the prolog and the
* epilog should be of size 'bin_sz - 1' and
* exists in the iov elements between prolog
* and epilog...
*/
{
Uint ix, debug_bin_sz = 0;
for (ix = hopefull_ix+1; ix < new_hopefull_ix; ix++)
debug_bin_sz += iov[ix].iov_len;
ASSERT(debug_bin_sz == bin_sz - 1);
}
#endif
/* jump to epilog... */
hopefull_ix = new_hopefull_ix;
ep = (byte *) iov[hopefull_ix].iov_base;
/* read original encoded epilog... */
epilog_byte = *ep;
ASSERT(1 == iov[hopefull_ix].iov_len);
iov[hopefull_ix].iov_base -= 4;
ep = (byte *) iov[hopefull_ix].iov_base;
new_hopefull_ix = get_int32(ep);
ASSERT(new_hopefull_ix == ERTS_NO_HIX
|| (hopefull_ix < new_hopefull_ix
&& new_hopefull_ix < eiov->vsize));
/* write fallback epilog... */
*ep++ = epilog_byte;
*ep++ = SMALL_INTEGER_EXT;
*ep++ = bitsize;
/* Update iov sizes... */
new_len = ep - (byte *) iov[hopefull_ix].iov_base;
eiov->size -= iov[hopefull_ix].iov_len;
eiov->size += new_len;
iov[hopefull_ix].iov_len = new_len;
r--;
break;
}
default:
ERTS_INTERNAL_ERROR("Unexpected external tag");
break;
}
hopefull_ix = new_hopefull_ix;
r--;
}
}
/*
* Replace hopefull data header with actual header...
*/
ep = (byte *) iov[1].iov_base;
eiov->size -= iov[1].iov_len;
if (dflags & (DFLAG_DIST_HDR_ATOM_CACHE|DFLAG_FRAGMENTS)) {
/*
* Encoding was done without atom caching but receiver expects
* a dist header, so we prepend an empty one.
*/
*ep++ = VERSION_MAGIC;
*ep++ = DIST_HEADER;
*ep++ = 0; /* NumberOfAtomCacheRefs */
}
else {
hdr += 4;
payload_ix = get_int32(hdr);
if (payload_ix) {
ASSERT(0 < payload_ix && payload_ix < eiov->vsize);
/* Prepend version magic on payload. */
iov[payload_ix].iov_base--;
*((byte *) iov[payload_ix].iov_base) = VERSION_MAGIC;
iov[payload_ix].iov_len++;
eiov->size++;
r--;
}
*ep++ = PASS_THROUGH;
*ep++ = VERSION_MAGIC;
}
iov[1].iov_len = ep - (byte *) iov[1].iov_base;
eiov->size += iov[1].iov_len;
r--;
/* done... */
reds -= (start_r - r)/ERTS_TRANSCODE_REDS_FACT + 1;
if (reds < 0)
return 0;
return reds;
}