int erts_net_message()

in erts/emulator/beam/dist.c [1803:2749]


int erts_net_message(Port *prt,
		     DistEntry *dep,
                     Uint32 conn_id,
		     byte *hbuf,
		     ErlDrvSizeT hlen,
                     Binary *bin,
		     byte *buf,
		     ErlDrvSizeT len)
{
    ErtsDistExternal ede, *edep = &ede;
    ErtsDistExternalData ede_data;
    ErlHeapFragment *ede_hfrag = NULL;
    Sint ctl_len;
    Eterm arg;
    Eterm from, to;
    Eterm watcher, watched;
    Eterm ref;
    Eterm *tuple;
    Eterm reason;
    Process* rp;
    DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
    Eterm* ctl = ctl_default;
    ErtsHeapFactory factory;
    Sint type;
    Eterm token;
    Uint tuple_arity;
    int res;
#ifdef ERTS_DIST_MSG_DBG
    ErlDrvSizeT orig_len = len;
#endif

    UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);

    ERTS_CHK_NO_PROC_LOCKS;

    ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));

    if (!erts_is_alive) {
	UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
	return 0;
    }

    ASSERT(hlen == 0);

    if (len == 0) {  /* HANDLE TICK !!! */
	UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
	return 0;
    }

#ifdef ERTS_RAW_DIST_MSG_DBG
    erts_fprintf(dbg_file, "RECV: ");
    bw(buf, len);
#endif

    ede.data = &ede_data;

    res = erts_prepare_dist_ext(&ede, buf, len, bin, dep, conn_id, dep->cache);

    switch (res) {
    case ERTS_PREP_DIST_EXT_CLOSED:
        return 0; /* Connection not alive; ignore signal... */
    case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
	erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
	bw(buf, orig_len);
#endif
        goto data_error;
    case ERTS_PREP_DIST_EXT_SUCCESS:
	ctl_len = erts_decode_dist_ext_size(&ede, 1, 0);
        if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
            erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
            bw(buf, orig_len);
#endif
            PURIFY_MSG("data error");
            goto data_error;
        }

        /* A non-fragmented message */
        if (!ede.data->seq_id) {
            if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
                ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
            }

            erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
            break;
        } else {
            DistSeqNode *seq;
            Uint sz = erts_dist_ext_size(&ede);
            Uint used_sz = ctl_len * sizeof(Eterm);

            /* We calculate the size of the heap fragment to be allocated.
               The used_size part has to be larger that the ctl data and the
               DistSeqNode. */
            if (used_sz + (sizeof(ErlHeapFragment) - sizeof(Eterm)) < sizeof(DistSeqNode))
                used_sz = sizeof(DistSeqNode) - (sizeof(ErlHeapFragment) - sizeof(Eterm));

            seq = (DistSeqNode *)new_message_buffer((sz + used_sz) / sizeof(Eterm));
            seq->hfrag.used_size = used_sz / sizeof(Eterm);

            seq->ctl_len = ctl_len;
            seq->seq_id = ede.data->seq_id;
            seq->cnt = ede.data->frag_id;
            if (dist_seq_rbt_lookup_insert(&dep->sequences, seq) != NULL) {
                free_message_buffer(&seq->hfrag);
                goto data_error;
            }

            erts_make_dist_ext_copy(&ede, erts_get_dist_ext(&seq->hfrag));

            if (ede.data->frag_id > 1) {
                seq->cnt--;
                return 0;
            }
        }

        /* fall through, the first fragment in the sequence was the last fragment */
    case ERTS_PREP_DIST_EXT_FRAG_CONT: {
        DistSeqNode *seq = dist_seq_rbt_lookup(dep->sequences, ede.data->seq_id);

        if (!seq)
            goto data_error;

        /* If we did a fall-though we already did this */
        if (res == ERTS_PREP_DIST_EXT_FRAG_CONT)
            erts_dist_ext_frag(&ede_data, erts_get_dist_ext(&seq->hfrag));

        /* Verify that the fragments have arrived in the correct order */
        if (seq->cnt != ede.data->frag_id)
            goto data_error;

        seq->cnt--;

        /* Check if this was the last fragment */
        if (ede.data->frag_id > 1)
            return 0;

        /* Last fragment arrived, time to dispatch the signal */
        dist_seq_rbt_delete(&dep->sequences, seq);
        ctl_len = seq->ctl_len;

        /* Now that we no longer need the DistSeqNode we re-use the heapfragment
           to decode the ctl msg into. We don't need the ctl message to be in
           the heapfragment, but we decode into the heapfragment speculatively
           in case there is a trace token that we need. */
        erts_factory_heap_frag_init(&factory, &seq->hfrag);
        edep = erts_get_dist_ext(&seq->hfrag);
        ede_hfrag = &seq->hfrag;

        break;
    }
    default:
        ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
        break;
    }

    arg = erts_decode_dist_ext(&factory, edep, 1);
    if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
	erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
	bw(buf, orig_len);
#endif
	PURIFY_MSG("data error");
	goto decode_error;
    }

    /* Fill the unused part of the hfrag with a bignum header */
    if (ede_hfrag && ede_hfrag->mem + ede_hfrag->used_size > factory.hp) {
        Uint slot = factory.hp - ede_hfrag->mem;
        ede_hfrag->mem[slot] = make_pos_bignum_header(ede_hfrag->used_size - slot - 1);
    }

    if (is_not_tuple(arg) || 
	(tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) ||
	is_not_small(tuple[1])) {
#ifdef ERTS_DIST_MSG_DBG
        if (is_tuple(arg) && arityval(*tuple) > 1)
            erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n",
                         erts_dop_to_string(unsigned_val(tuple[1])), arg);
#endif
 	goto invalid_message;
    }

#ifdef ERTS_DIST_MSG_DBG
    erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n",
                 erts_dop_to_string(unsigned_val(tuple[1])), arg);
#endif

    token = NIL;

    switch (type = unsigned_val(tuple[1])) {
    case DOP_LINK: {
        ErtsDSigSendContext ctx;
        int code;

	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];
	to   = tuple[3];  /* local proc to link to */

	if (is_not_external_pid(from))
            goto invalid_message;

        if (dep != external_pid_dist_entry(from))
            goto invalid_message;

        if (is_external_pid(to)) {
            if (external_pid_dist_entry(to) != erts_this_dist_entry)
                goto invalid_message;
            /* old incarnation of node; reply noproc... */
        }
        else if (is_internal_pid(to)) {
            ErtsLinkData *ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC,
                                                 from, to);
            ASSERT(ldp->a.other.item == to);
            ASSERT(eq(ldp->b.other.item, from));
            code = erts_link_dist_insert(&ldp->a, dep->mld);
            ASSERT(code);

            if (erts_proc_sig_send_link(NULL, to, &ldp->b))
                break; /* done */

            /* Failed to send signal; cleanup and reply noproc... */

            code = erts_link_dist_delete(&ldp->a);
            ASSERT(code);
            erts_link_release_both(ldp);
        }

        code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
        if (code == ERTS_DSIG_PREP_CONNECTED) {
            code = erts_dsig_send_exit(&ctx, to, from, am_noproc);
            ASSERT(code == ERTS_DSIG_SEND_OK);
        }

	break;
    }

    case DOP_UNLINK: {
	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];
	to = tuple[3];
	if (is_not_external_pid(from))
	    goto invalid_message;
        if (dep != external_pid_dist_entry(from))
	    goto invalid_message;

        if (is_external_pid(to)
            && erts_this_dist_entry == external_pid_dist_entry(from))
            break;

        if (is_not_internal_pid(to))
            goto invalid_message;

        erts_proc_sig_send_dist_unlink(dep, from, to);
	break;
    }
    
    case DOP_MONITOR_P: {
	/* A remote process wants to monitor us, we get:
	   {DOP_MONITOR_P, Remote pid, local pid or name, ref} */
	Eterm pid, name;
        ErtsDSigSendContext ctx;
        int code;

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	watcher = tuple[2];
	watched = tuple[3];  /* local proc to monitor */
	ref     = tuple[4];

        if (is_not_external_pid(watcher))
            goto invalid_message;
        else if (external_pid_dist_entry(watcher) != dep)
            goto invalid_message;

	if (is_not_ref(ref))
	    goto invalid_message;

        if (is_internal_pid(watched)) {
            name = NIL;
            pid = watched;
        }
        else if (is_atom(watched)) {
            name = watched;
            pid = erts_whereis_name_to_id(NULL, watched);
            /* if port or undefined; reply noproc... */
        }
        else if (is_external_pid(watched)
                 && external_pid_dist_entry(watched) == erts_this_dist_entry) {
            name = NIL;
            pid = am_undefined; /* old incarnation; reply noproc... */
        }
        else
            goto invalid_message;

        if (is_internal_pid(pid)) {
            ErtsMonitorData *mdp;
            mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC,
                                      ref, watcher, pid, name);

            code = erts_monitor_dist_insert(&mdp->origin, dep->mld);
            ASSERT(code); (void)code;

            if (erts_proc_sig_send_monitor(&mdp->target, pid))
                break; /* done */

            /* Failed to send to local proc; cleanup reply noproc... */

            code = erts_monitor_dist_delete(&mdp->origin);
            ASSERT(code); (void)code;
            erts_monitor_release_both(mdp);

        }

        code = erts_dsig_prepare(&ctx, dep, NULL, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
        if (code == ERTS_DSIG_PREP_CONNECTED) {
            code = erts_dsig_send_m_exit(&ctx, watcher, watched, ref, am_noproc);
            ASSERT(code == ERTS_DSIG_SEND_OK);
        }

        break;
    }

    case DOP_DEMONITOR_P:
	/* A remote node informs us that a local pid in no longer monitored
	   We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref},
	   We need only the ref of course */

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	watcher = tuple[2];
	watched = tuple[3];
	ref = tuple[4];

	if (is_not_ref(ref)) {
	    goto invalid_message;
	}

        if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep)
            goto invalid_message;

        if (is_internal_pid(watched))
            erts_proc_sig_send_dist_demonitor(watched, ref);
        else if (is_external_pid(watched)
                 && external_pid_dist_entry(watched) == erts_this_dist_entry) {
            /* old incarnation; ignore it */
            ;
        }
        else if (is_atom(watched)) {
            ErtsMonLnkDist *mld = dep->mld;
            ErtsMonitor *mon;

            erts_mtx_lock(&mld->mtx);

            mon = erts_monitor_tree_lookup(mld->orig_name_monitors, ref);
            if (mon)
                erts_monitor_tree_delete(&mld->orig_name_monitors, mon);

            erts_mtx_unlock(&mld->mtx);

            if (mon)
                erts_proc_sig_send_demonitor(mon);
        }
        else
            goto invalid_message;

	break;

    case DOP_REG_SEND_TT:
	if (tuple_arity != 5) {
	    goto invalid_message;
	}

	/* Fall through ... */
    case DOP_REG_SEND:
	/* {DOP_REG_SEND, From, Cookie, ToName} -- Message */
	/* {DOP_REG_SEND_TT, From, Cookie, ToName, TraceToken} -- Message */

	/*
	 * There is intentionally no testing of the cookie (it is always '')
	 * from R9B and onwards.
	 */
	if (type != DOP_REG_SEND_TT && tuple_arity != 4) {
	    goto invalid_message;
	}

#ifdef ERTS_DIST_MSG_DBG
	dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif

	from = tuple[2];
	to = tuple[4];
	if (is_not_pid(from) || is_not_atom(to)){
	    goto invalid_message;
	}
	rp = erts_whereis_process(NULL, 0, to, 0, 0);
	if (rp) {
	    ErtsProcLocks locks = 0;

	    if (type == DOP_REG_SEND) {
		token = NIL;
	    } else {
		token = tuple[5];
	    }

            erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, from);

	    if (locks)
		erts_proc_unlock(rp, locks);
	} else if (ede_hfrag != NULL) {
            erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
            free_message_buffer(ede_hfrag);
        }
	break;

    case DOP_SEND_SENDER_TT: {
    case DOP_SEND_TT:

	if (tuple_arity != 4) {
	    goto invalid_message;
	}

	token = tuple[4];
        goto send_common;

    case DOP_SEND_SENDER:
    case DOP_SEND:

        token = NIL;
	if (tuple_arity != 3)
	    goto invalid_message;

    send_common:

	/*
         * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains
         * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise,
         * the atom '' (empty cookie).
	 */
        ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT)
               ? (is_pid(tuple[2]) && (dep->dflags & DFLAG_SEND_SENDER))
               : tuple[2] == am_Empty);

#ifdef ERTS_DIST_MSG_DBG
	dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
	to = tuple[3];
	if (is_not_pid(to)) {
	    goto invalid_message;
	}
	rp = erts_proc_lookup(to);

	if (rp) {
	    ErtsProcLocks locks = 0;

	    erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, am_Empty);
	    if (locks)
		erts_proc_unlock(rp, locks);
        } else if (ede_hfrag != NULL) {
            erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
            free_message_buffer(ede_hfrag);
        }
	
	break;
    }

    case DOP_PAYLOAD_MONITOR_P_EXIT:
    case DOP_MONITOR_P_EXIT: {

	/* We are monitoring a process on the remote node which dies, we get
	   {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */

        watched = tuple[2];  /* remote proc or name which died */
        watcher = tuple[3];
	ref     = tuple[4];

        if (type == DOP_PAYLOAD_MONITOR_P_EXIT) {
            if (tuple_arity != 4) {
                goto invalid_message;
            }
            reason = THE_NON_VALUE;
        } else {
            if (tuple_arity != 5) {
                goto invalid_message;
            }
            reason = tuple[5];
            edep = NULL;
        }

	if (is_not_ref(ref))
	    goto invalid_message;

        if (is_not_external_pid(watched) && is_not_atom(watched))
            goto invalid_message;

        if (is_not_internal_pid(watcher)) {
            if (!is_external_pid(watcher))
                goto invalid_message;
            if (erts_this_dist_entry == external_pid_dist_entry(watcher))
                break;
            goto invalid_message;
        }

        if (!erts_proc_lookup(watcher)) {
            if (ede_hfrag != NULL) {
                erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
                free_message_buffer(ede_hfrag);
            }
            break; /* Process not alive */
        }

#ifdef ERTS_DIST_MSG_DBG
        if (reason == THE_NON_VALUE) {
            dist_msg_dbg(edep, "MSG", buf, orig_len);
        }
#endif

        erts_proc_sig_send_dist_monitor_down(
            dep, ref, watched, watcher, edep, ede_hfrag, reason);
	break;
    }

    case DOP_PAYLOAD_EXIT:
    case DOP_PAYLOAD_EXIT_TT:
    case DOP_EXIT_TT:
    case DOP_EXIT: {

	/* 'from', which 'to' is linked to, died */
        from = tuple[2];
        to = tuple[3];

	if (type == DOP_EXIT) {
	    if (tuple_arity != 4) {
		goto invalid_message;
	    }
	    token = NIL;
	    reason = tuple[4];
            edep = NULL;
	} else if (type == DOP_EXIT_TT){
	    if (tuple_arity != 5) {
		goto invalid_message;
	    }
	    token = tuple[4];
	    reason = tuple[5];
            edep = NULL;
	} else if (type == DOP_PAYLOAD_EXIT) {
            if (tuple_arity != 3) {
		goto invalid_message;
	    }
            token = NIL;
            reason = THE_NON_VALUE;
        } else {
            if (tuple_arity != 4) {
		goto invalid_message;
	    }
            token = tuple[4];
            reason = THE_NON_VALUE;
        }
	if (is_not_external_pid(from)
            || dep != external_pid_dist_entry(from)
            || is_not_internal_pid(to)) {
	    goto invalid_message;
	}

        if (!erts_proc_lookup(to)) {
            if (ede_hfrag != NULL) {
                erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
                free_message_buffer(ede_hfrag);
            }
            break; /* Process not alive */
        }

#ifdef ERTS_DIST_MSG_DBG
        if (reason == THE_NON_VALUE) {
            dist_msg_dbg(edep, "MSG", buf, orig_len);
        }
#endif

        erts_proc_sig_send_dist_link_exit(dep,
                                          from, to, edep, ede_hfrag,
                                          reason, token);
	break;
    }
    case DOP_PAYLOAD_EXIT2_TT:
    case DOP_PAYLOAD_EXIT2:
    case DOP_EXIT2_TT:
    case DOP_EXIT2: {

	/* 'from' is send an exit signal to 'to' */
        from = tuple[2];
        to = tuple[3];

	if (type == DOP_EXIT2) {
	    if (tuple_arity != 4) {
		goto invalid_message;
	    }
	    reason = tuple[4];
	    token = NIL;
            edep = NULL;
	} else if (type == DOP_EXIT2_TT) {
	    if (tuple_arity != 5) {
		goto invalid_message;
	    }
	    token = tuple[4];
	    reason = tuple[5];
            edep = NULL;
	} else if (type == DOP_PAYLOAD_EXIT2) {
            if (tuple_arity != 3) {
		goto invalid_message;
	    }
            reason = THE_NON_VALUE;
            token = NIL;
        } else {
            if (tuple_arity != 4) {
		goto invalid_message;
	    }
            reason = THE_NON_VALUE;
            token = tuple[4];
        }
	if (is_not_pid(from)
            || dep != external_pid_dist_entry(from)) {
	    goto invalid_message;
	}
        if (is_not_internal_pid(to)) {
            if (is_external_pid(to)) {
		DistEntry *dep = external_pid_dist_entry(to);
		if (dep == erts_this_dist_entry)
                    break; /* Old incarnation of this node... */
            }
            goto invalid_message;
        }

        if (!erts_proc_lookup(to)) {
            if (ede_hfrag != NULL) {
                erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
                free_message_buffer(ede_hfrag);
            }
            break; /* Process not alive */
        }

#ifdef ERTS_DIST_MSG_DBG
        if (reason == THE_NON_VALUE) {
            dist_msg_dbg(edep, "MSG", buf, orig_len);
        }
#endif

        erts_proc_sig_send_dist_exit(dep, from, to, edep, ede_hfrag, reason, token);
	break;
    }
    case DOP_GROUP_LEADER:
	if (tuple_arity != 3) {
	    goto invalid_message;
	}
	from = tuple[2];   /* Group leader  */
	to = tuple[3];     /* new member */
	if (is_not_pid(from) || is_not_pid(to)) {
	    goto invalid_message;
	}

        (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL);
	break;

    case DOP_SPAWN_REQUEST_TT: {
        Eterm tmp_heap[2];
        ErlSpawnOpts so;
        int code, opts_error;
        Eterm pid, error, ref, from, gl, mfa, opts, token, args;
        /* {DOP_SPAWN_REQUEST_TT, Ref, From, GL, MFA, Opts, Token} */

        if (tuple_arity != 7)
            goto invalid_message;

        token = tuple[7];

        if (0) {
            
        case DOP_SPAWN_REQUEST:
            /* {DOP_SPAWN_REQUEST, Ref, From, GL, MFA, Opts} */
            if (tuple_arity != 6)
                goto invalid_message;

            token = NIL;
        }

        ref = tuple[2];
        from = tuple[3];
        gl = tuple[4];
        mfa = tuple[5];
        opts = tuple[6];
        
        if (is_not_external_ref(ref))
            goto invalid_message;
        if (is_not_external_pid(from))
            goto invalid_message;
        if (is_not_pid(gl))
            goto invalid_message;
        if (is_not_tuple_arity(mfa, 3))
            goto invalid_message;
        else {
            Eterm *tp = tuple_val(tuple[5]);
            if (is_not_atom(tp[1]))
                goto invalid_message;
            if (is_not_atom(tp[2]))
                goto invalid_message;
            if (is_not_small(tp[3]))
                goto invalid_message;
        }

        opts_error = erts_parse_spawn_opts(&so, opts, NULL, 0);
        if (opts_error) {
            ErtsDSigSendContext ctx;
            if (opts_error > 1)
                goto invalid_message;
            error = am_badopt;
        dist_spawn_error:
            if (ede_hfrag) {
                erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
                free_message_buffer(ede_hfrag);
            }
            if (have_seqtrace(token)) {
                /*
                 * See erl_create_process() for why we do this
                 * serial trickery...
                 */
                Eterm tmp_heap[7];
                Eterm seq_msg;
                Eterm serial;
                Uint serial_num;
                /* Receiver spawn_request... */
                serial = SEQ_TRACE_T_SERIAL(token);
                serial_num = unsigned_val(serial);
                serial_num--;
                serial = make_small(serial_num);
                SEQ_TRACE_T_SERIAL(token) = serial;
                seq_msg = TUPLE6(&tmp_heap[0], am_spawn_request,
                                 ref, from, gl, mfa, opts);
                seq_trace_output(token, seq_msg, SEQ_TRACE_RECEIVE,
                                 erts_this_dist_entry->sysname, NULL);
                SEQ_TRACE_T_LASTCNT(token) = serial;
                /* Send spawn_reply... */
                erts_seq_trace_update_node_token(token);
                seq_msg = TUPLE4(&tmp_heap[0],
                                 am_spawn_reply, ref, am_error, error);
                seq_trace_output(token, seq_msg, SEQ_TRACE_SEND, from, NULL);
            }
            code = erts_dsig_prepare(&ctx, dep, NULL, 0,
                                     ERTS_DSP_NO_LOCK, 1, 1, 0);
            if (code == ERTS_DSIG_PREP_CONNECTED) {
                code = erts_dsig_send_spawn_reply(&ctx,
                                                  tuple[2],
                                                  tuple[3],
                                                  make_small(0),
                                                  error,
                                                  token);
                ASSERT(code == ERTS_DSIG_SEND_OK);
            }
            break;
        }

        so.mref = ref;
        so.tag = am_spawn_reply;
        so.parent_id = from;
        so.group_leader = gl;
        so.mfa = mfa;
        so.dist_entry = dep;
        so.edep = edep;
        so.ede_hfrag = ede_hfrag;
        so.token = token;
        so.opts = opts;

        args = CONS(&tmp_heap[0], mfa, NIL);
        pid = erl_create_process(NULL,
                                 am_erts_internal,
                                 am_dist_spawn_init,
                                 args,
                                 &so);
        if (is_non_value(pid)) {
            if (so.error_code == SYSTEM_LIMIT)
                error = am_system_limit;
            else
                goto invalid_message; /* should not happen */
            goto dist_spawn_error;
        }
        
        break;
    }

    case DOP_SPAWN_REPLY_TT: {
        ErtsLinkData *ldp;
        ErtsLink *lnk;
        int monitor;
        Eterm ref, result, flags_term, parent, token;
        Uint flags;

        /* {DOP_SPAWN_REPLY_TT, Ref, To, Flags, From, Token} */
        if (tuple_arity != 6)
            goto invalid_message;

        token = tuple[6];

        if (0) {
        case DOP_SPAWN_REPLY:

            /* {DOP_SPAWN_REPLY, Ref, To, Flags, From} */
            if (tuple_arity != 5)
                goto invalid_message;
            
            token = NIL;
        }

        ldp = NULL;
        lnk = NULL;
        monitor = 0;

        ref = tuple[2];
        parent = tuple[3];
        flags_term = tuple[4];
        result = tuple[5];
        
        if (is_not_internal_pid(parent)) {
            if (is_external_pid(parent)) {
		DistEntry *dep = external_pid_dist_entry(parent);
		if (dep == erts_this_dist_entry)
                    break; /* Old incarnation of this node... */
            }
            goto invalid_message;
        }

        if (is_not_internal_ref(ref)) {
            if (is_external_ref(ref)) {
		DistEntry *dep = external_ref_dist_entry(ref);
		if (dep == erts_this_dist_entry)
                    break; /* Old incarnation of this node... */
            }
            goto invalid_message;
        }

        if (is_not_small(flags_term))
            goto invalid_message;

        flags = unsigned_val(flags_term);
        if (flags >= (1 << 27))
            goto invalid_message;

        if (is_not_external_pid(result) && is_not_atom(result))
            goto invalid_message;
        
        if (is_external_pid(result)) {

            monitor = !!(flags & ERTS_DIST_SPAWN_FLAG_MONITOR);

            if (flags & ERTS_DIST_SPAWN_FLAG_LINK) {
                /* Successful spawn-link... */
                int code;
            
                ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC,
                                       result, parent);
                ASSERT(ldp->a.other.item == parent);
                ASSERT(eq(ldp->b.other.item, result));
                code = erts_link_dist_insert(&ldp->a, dep->mld);
                ASSERT(code); (void)code;

                lnk = &ldp->b;
            }
        }

        if (!erts_proc_sig_send_dist_spawn_reply(dep->sysname, ref,
                                                 parent, lnk, result,
                                                 token)) {
            ErtsDSigSendContext ctx;
            int code;
            
            if (monitor) {
                code = erts_dsig_prepare(&ctx, dep, NULL, 0,
                                         ERTS_DSP_NO_LOCK, 1, 1, 0);
                if (code == ERTS_DSIG_PREP_CONNECTED) {
                    code = erts_dsig_send_demonitor(&ctx, parent,
                                                    result, ref);
                    ASSERT(code == ERTS_DSIG_SEND_OK);
                }
            }

            if (lnk) {
                
                code = erts_link_dist_delete(&ldp->a);
                ASSERT(code);
                erts_link_release_both(ldp);
            }

            if (flags & ERTS_DIST_SPAWN_FLAG_LINK) {
                /*
                 * Save info so the terminating parent can send us
                 * an exit signal with the correct exit reason...
                 */
                dist_pend_spawn_exit_save_child_result(result,
                                                       ref,
                                                       dep->mld);
            }
        }

        break;
    }

    default:
	goto invalid_message;
    }

    if (ede_hfrag == NULL) {
        erts_factory_close(&factory);
        if (ctl != ctl_default) {
            erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
        }
    }
    UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
    ERTS_CHK_NO_PROC_LOCKS;
    return 0;
 invalid_message:
    {
	erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
	erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
	erts_send_error_to_logger_nogl(dsbufp);
    }
decode_error:
    PURIFY_MSG("data error");
    if (ede_hfrag == NULL) {
        erts_factory_close(&factory);
        if (ctl != ctl_default) {
            erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
        }
    } else {
        erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
        free_message_buffer(ede_hfrag);
    }
data_error:
    UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
    erts_kill_dist_connection(dep, conn_id);
    ERTS_CHK_NO_PROC_LOCKS;
    return -1;
}