in c/src/core/message.c [731:838]
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
{
assert(msg && bytes && size);
pn_bytes_t msg_bytes = {.size=size, .start=bytes};
pn_bytes_t instructions_bytes = {0, 0};
pn_bytes_t annotations_bytes = {0, 0};
pn_bytes_t properties_bytes = {0, 0};
pn_bytes_t body_bytes = {0, 0};
pn_bytes_t unknown_section_bytes = {0, 0};
while (msg_bytes.size) {
bool scanned;
uint64_t desc;
size_t section_size = pn_amqp_decode_DQLq(msg_bytes, &scanned, &desc);
if (!scanned) {
desc = 0;
}
switch (desc) {
case AMQP_DESC_HEADER: {
bool priority_q;
uint8_t priority;
pn_amqp_decode_DqEoQBIoIe(msg_bytes,
&msg->durable,
&priority_q, &priority,
&msg->ttl,
&msg->first_acquirer,
&msg->delivery_count);
msg->priority = priority_q ? priority : AMQP_HEADER_PRIORITY_DEFAULT;
break;
}
case AMQP_DESC_PROPERTIES: {
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
pn_atom_t id;
pn_atom_t correlation_id;
pn_amqp_decode_DqEazSSSassttSISe(msg_bytes, &id,
&user_id, &address, &subject, &reply_to,
&correlation_id, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
pni_msgid_fix_interop(&id);
pn_message_set_id(msg, id);
int err = pn_string_set_bytes(msg->user_id, user_id);
if (err) return pn_error_format(msg->error, err, "error setting user_id");
err = pn_string_setn(msg->address, address.start, address.size);
if (err) return pn_error_format(msg->error, err, "error setting address");
err = pn_string_setn(msg->subject, subject.start, subject.size);
if (err) return pn_error_format(msg->error, err, "error setting subject");
err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size);
if (err) return pn_error_format(msg->error, err, "error setting reply_to");
pni_msgid_fix_interop(&correlation_id);
pn_message_set_correlation_id(msg, correlation_id);
err = pn_string_setn(msg->content_type, ctype.start, ctype.size);
if (err) return pn_error_format(msg->error, err, "error setting content_type");
err = pn_string_setn(msg->content_encoding, cencoding.start,
cencoding.size);
if (err) return pn_error_format(msg->error, err, "error setting content_encoding");
err = pn_string_setn(msg->group_id, group_id.start, group_id.size);
if (err) return pn_error_format(msg->error, err, "error setting group_id");
err = pn_string_setn(msg->reply_to_group_id, reply_to_group_id.start,
reply_to_group_id.size);
if (err) return pn_error_format(msg->error, err, "error setting reply_to_group_id");
break;
}
case AMQP_DESC_DELIVERY_ANNOTATIONS: {
pn_amqp_decode_DqR(msg_bytes, &instructions_bytes);
break;
}
case AMQP_DESC_MESSAGE_ANNOTATIONS: {
pn_amqp_decode_DqR(msg_bytes, &annotations_bytes);
break;
}
case AMQP_DESC_APPLICATION_PROPERTIES: {
pn_amqp_decode_DqR(msg_bytes, &properties_bytes);
break;
}
case AMQP_DESC_DATA:
case AMQP_DESC_AMQP_SEQUENCE: {
msg->inferred = true;
pn_amqp_decode_DqR(msg_bytes, &body_bytes);
break;
}
case AMQP_DESC_AMQP_VALUE: {
msg->inferred = false;
pn_amqp_decode_DqR(msg_bytes, &body_bytes);
break;
}
case AMQP_DESC_FOOTER:
break;
default: {
pn_amqp_decode_R(msg_bytes, &unknown_section_bytes);
break;
}
}
msg_bytes = (pn_bytes_t){.size=msg_bytes.size-section_size, .start=msg_bytes.start+section_size};
}
pn_bytes_free(msg->instructions_raw);
msg->instructions_raw = pn_bytes_dup(instructions_bytes);
pn_bytes_free(msg->annotations_raw);
msg->annotations_raw = pn_bytes_dup(annotations_bytes);
pn_bytes_free(msg->properties_raw);
msg->properties_raw = pn_bytes_dup(properties_bytes);
pn_bytes_free(msg->body_raw);
msg->body_raw = pn_bytes_dup(body_bytes);
return 0;
}