in src/message.c [1693:1840]
static int compose_router_message_annotations(qd_message_pvt_t *msg, uint8_t *ma_header,
qd_buffer_list_t *ma_trailer)
{
qd_message_content_t *content = msg->content;
// account for any user annotations to be sent before the router annotations
//
uint32_t mcount = content->ma_user_count;
uint32_t msize = content->ma_user_annotations.remaining;
if (msg->ma_phase) {
assert(msg->ma_phase < 128); // smallint
mcount += 2;
// key:
msize += QD_MA_PHASE_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_PHASE_ENCODED, QD_MA_PHASE_ENCODED_LEN);
// value:
msize += 2; // tag + 1 byte value
uint8_t ma_phase[2];
ma_phase[0] = QD_AMQP_SMALLINT;
ma_phase[1] = msg->ma_phase;
qd_buffer_list_append(ma_trailer, ma_phase, 2);
}
if (msg->ma_streaming) {
mcount += 2;
// key:
msize += QD_MA_STREAM_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_STREAM_ENCODED, QD_MA_STREAM_ENCODED_LEN);
// value: historically sent as int value 1:
msize += 2;
const uint8_t streaming[2] = {QD_AMQP_SMALLINT, 1};
qd_buffer_list_append(ma_trailer, streaming, 2);
}
if (msg->ma_to_override || content->ma_pf_to_override) {
mcount += 2;
// key:
msize += QD_MA_TO_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_TO_ENCODED, QD_MA_TO_ENCODED_LEN);
// value: message specific value takes precedence over value in
// original received message to allow overriding the to-override
uint8_t hdr[5]; // max length of encoded str8/32 header
if (msg->ma_to_override) {
const size_t str_len = strlen(msg->ma_to_override);
const int hdr_len = qd_compose_str_header(hdr, str_len);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += str_len;
qd_buffer_list_append(ma_trailer, (uint8_t*) msg->ma_to_override, str_len);
} else {
qd_buffer_field_t to = qd_parse_value(content->ma_pf_to_override);
const int hdr_len = qd_compose_str_header(hdr, to.remaining);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += to.remaining;
qd_buffer_list_append_field(ma_trailer, &to);
}
}
if (!msg->ma_filter_ingress) {
mcount += 2;
// key
msize += QD_MA_INGRESS_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_INGRESS_ENCODED, QD_MA_INGRESS_ENCODED_LEN);
// value: use original value if present, else the local node is the
// ingress
if (content->ma_pf_ingress && !msg->ma_reset_ingress) {
uint8_t hdr[5]; // max size str8/32 header
qd_buffer_field_t ingress = qd_parse_value(content->ma_pf_ingress);
const int hdr_len = qd_compose_str_header(hdr, ingress.remaining);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, hdr, hdr_len);
msize += ingress.remaining;
qd_buffer_list_append_field(ma_trailer, &ingress);
} else {
size_t node_id_len;
const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
msize += node_id_len;
qd_buffer_list_append(ma_trailer, node_id, node_id_len);
}
}
if (!msg->ma_filter_trace) {
mcount += 2;
size_t node_id_len;
const uint8_t *node_id = qd_router_id_encoded(&node_id_len);
uint32_t trace_count = 1; // local node
uint32_t trace_len = node_id_len;
const bool use_incoming = content->ma_pf_trace && !msg->ma_reset_trace;
// key
msize += QD_MA_TRACE_ENCODED_LEN;
qd_buffer_list_append(ma_trailer, QD_MA_TRACE_ENCODED, QD_MA_TRACE_ENCODED_LEN);
// value: first compute trace list size and count since the list header
// must be written first
qd_buffer_field_t in_trace = {0};
if (use_incoming) {
in_trace = qd_parse_value(content->ma_pf_trace);
trace_len += in_trace.remaining;
trace_count += qd_parse_sub_count(content->ma_pf_trace);
}
uint8_t list_hdr[9]; // max len encoded list header
const int hdr_len = qd_compose_list_header(list_hdr, trace_len, trace_count);
msize += hdr_len;
qd_buffer_list_append(ma_trailer, list_hdr, hdr_len);
if (use_incoming) {
msize += in_trace.remaining;
qd_buffer_list_append_field(ma_trailer, &in_trace);
}
msize += node_id_len;
qd_buffer_list_append(ma_trailer, node_id, node_id_len);
}
if (msize) {
// setup the MA section descriptor:
ma_header[0] = 0;
ma_header[1] = QD_AMQP_SMALLULONG;
ma_header[2] = QD_PERFORMATIVE_MESSAGE_ANNOTATIONS;
// setup the MA MAP header
const int hdr_size = qd_compose_map_header(&ma_header[3], msize, mcount);
return hdr_size + 3;
}
return 0;
}