static int compose_router_message_annotations()

in src/message.c [1693:1840]


static int compose_router_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header,
                                              qd_buffer_list_t *ma_trailer)
{
    qd_message_content_t *content = msg->content;

    // account for any user annotations to be sent before the router annotations
    //
    uint32_t mcount = content->ma_user_count;
    uint32_t msize  = content->ma_user_annotations.remaining;

    if (msg->ma_phase) {
        assert(msg->ma_phase < 128); // smallint
        mcount += 2;

        // key:
        msize += QD_MA_PHASE_ENCODED_LEN;
        qd_buffer_list_append(ma_trailer, QD_MA_PHASE_ENCODED, QD_MA_PHASE_ENCODED_LEN);

        // value:
        msize += 2;  // tag + 1 byte value
        uint8_t ma_phase[2];
        ma_phase[0] = QD_AMQP_SMALLINT;
        ma_phase[1] = msg->ma_phase;
        qd_buffer_list_append(ma_trailer, ma_phase, 2);
    }

    if (msg->ma_streaming) {
        mcount += 2;

        // key:
        msize += QD_MA_STREAM_ENCODED_LEN;
        qd_buffer_list_append(ma_trailer, QD_MA_STREAM_ENCODED, QD_MA_STREAM_ENCODED_LEN);

        // value: historically sent as int value 1:
        msize += 2;
        const uint8_t streaming[2] = {QD_AMQP_SMALLINT, 1};
        qd_buffer_list_append(ma_trailer, streaming, 2);
    }

    if (msg->ma_to_override || content->ma_pf_to_override) {
        mcount += 2;

        // key:
        msize += QD_MA_TO_ENCODED_LEN;
        qd_buffer_list_append(ma_trailer, QD_MA_TO_ENCODED, QD_MA_TO_ENCODED_LEN);

        // value: message specific value takes precedence over value in
        // original received message to allow overriding the to-override
        uint8_t hdr[5];  // max length of encoded str8/32 header
        if (msg->ma_to_override) {
            const size_t str_len = strlen(msg->ma_to_override);
            const int hdr_len = qd_compose_str_header(hdr, str_len);

            msize += hdr_len;
            qd_buffer_list_append(ma_trailer, hdr, hdr_len);

            msize += str_len;
            qd_buffer_list_append(ma_trailer, (uint8_t*) msg->ma_to_override, str_len);

        } else {
            qd_buffer_field_t to = qd_parse_value(content->ma_pf_to_override);
            const int hdr_len = qd_compose_str_header(hdr, to.remaining);

            msize += hdr_len;
            qd_buffer_list_append(ma_trailer, hdr, hdr_len);

            msize += to.remaining;
            qd_buffer_list_append_field(ma_trailer, &to);
        }
    }

    if (!msg->ma_filter_ingress) {
        mcount += 2;

        // key
        msize += QD_MA_INGRESS_ENCODED_LEN;
        qd_buffer_list_append(ma_trailer, QD_MA_INGRESS_ENCODED, QD_MA_INGRESS_ENCODED_LEN);

        // value: use original value if present, else the local node is the
        // ingress
        if (content->ma_pf_ingress && !msg->ma_reset_ingress) {
            uint8_t hdr[5];   // max size str8/32 header
            qd_buffer_field_t ingress = qd_parse_value(content->ma_pf_ingress);
            const int hdr_len = qd_compose_str_header(hdr, ingress.remaining);

            msize += hdr_len;
            qd_buffer_list_append(ma_trailer, hdr, hdr_len);

            msize += ingress.remaining;
            qd_buffer_list_append_field(ma_trailer, &ingress);

        } else {
            size_t node_id_len;
            const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
            msize += node_id_len;
            qd_buffer_list_append(ma_trailer, node_id, node_id_len);
        }
    }

    if (!msg->ma_filter_trace) {
        mcount += 2;
        size_t node_id_len;
        const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
        uint32_t trace_count = 1;  // local node
        uint32_t trace_len = node_id_len;
        const bool use_incoming = content->ma_pf_trace && !msg->ma_reset_trace;

        // key
        msize += QD_MA_TRACE_ENCODED_LEN;
        qd_buffer_list_append(ma_trailer, QD_MA_TRACE_ENCODED, QD_MA_TRACE_ENCODED_LEN);

        // value: first compute trace list size and count since the list header
        // must be written first
        qd_buffer_field_t in_trace = {0};
        if (use_incoming) {
            in_trace = qd_parse_value(content->ma_pf_trace);
            trace_len += in_trace.remaining;
            trace_count += qd_parse_sub_count(content->ma_pf_trace);
        }

        uint8_t list_hdr[9];  // max len encoded list header
        const int hdr_len = qd_compose_list_header(list_hdr, trace_len, trace_count);

        msize += hdr_len;
        qd_buffer_list_append(ma_trailer, list_hdr, hdr_len);

        if (use_incoming) {
            msize += in_trace.remaining;
            qd_buffer_list_append_field(ma_trailer, &in_trace);
        }

        msize += node_id_len;
        qd_buffer_list_append(ma_trailer, node_id, node_id_len);
    }

    if (msize) {
        // setup the MA section descriptor:
        ma_header[0] = 0;
        ma_header[1] = QD_AMQP_SMALLULONG;
        ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;

        // setup the MA MAP header
        const int hdr_size = qd_compose_map_header(&ma_header[3], msize, mcount);
        return hdr_size + 3;
    }

    return 0;
}