in c/src/core/message.c [707:812]
int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size)
{
assert(msg && bytes && size);
pn_bytes_t msg_bytes = {.size=size, .start=bytes};
while (msg_bytes.size) {
bool scanned;
uint64_t desc;
size_t section_size = pn_amqp_decode_DQLq(msg_bytes, &scanned, &desc);
if (!scanned) {
desc = 0;
}
switch (desc) {
case HEADER: {
bool priority_q;
uint8_t priority;
pn_amqp_decode_DqEoQBIoIe(msg_bytes,
&msg->durable,
&priority_q, &priority,
&msg->ttl,
&msg->first_acquirer,
&msg->delivery_count);
msg->priority = priority_q ? priority : HEADER_PRIORITY_DEFAULT;
break;
}
case PROPERTIES: {
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
pn_atom_t id;
pn_atom_t correlation_id;
pn_amqp_decode_DqEazSSSassttSISe(msg_bytes, &id,
&user_id, &address, &subject, &reply_to,
&correlation_id, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
pni_msgid_fix_interop(&id);
pn_message_set_id(msg, id);
int err = pn_string_set_bytes(msg->user_id, user_id);
if (err) return pn_error_format(msg->error, err, "error setting user_id");
err = pn_string_setn(msg->address, address.start, address.size);
if (err) return pn_error_format(msg->error, err, "error setting address");
err = pn_string_setn(msg->subject, subject.start, subject.size);
if (err) return pn_error_format(msg->error, err, "error setting subject");
err = pn_string_setn(msg->reply_to, reply_to.start, reply_to.size);
if (err) return pn_error_format(msg->error, err, "error setting reply_to");
pni_msgid_fix_interop(&correlation_id);
pn_message_set_correlation_id(msg, correlation_id);
err = pn_string_setn(msg->content_type, ctype.start, ctype.size);
if (err) return pn_error_format(msg->error, err, "error setting content_type");
err = pn_string_setn(msg->content_encoding, cencoding.start,
cencoding.size);
if (err) return pn_error_format(msg->error, err, "error setting content_encoding");
err = pn_string_setn(msg->group_id, group_id.start, group_id.size);
if (err) return pn_error_format(msg->error, err, "error setting group_id");
err = pn_string_setn(msg->reply_to_group_id, reply_to_group_id.start,
reply_to_group_id.size);
if (err) return pn_error_format(msg->error, err, "error setting reply_to_group_id");
break;
}
case DELIVERY_ANNOTATIONS: {
pn_data_clear(msg->instructions);
pn_amqp_decode_DqC(msg_bytes, msg->instructions);
pn_data_rewind(msg->instructions);
break;
}
case MESSAGE_ANNOTATIONS: {
pn_data_clear(msg->annotations);
pn_amqp_decode_DqC(msg_bytes, msg->annotations);
pn_data_rewind(msg->annotations);
break;
}
case APPLICATION_PROPERTIES: {
pn_data_clear(msg->properties);
pn_amqp_decode_DqC(msg_bytes, msg->properties);
pn_data_rewind(msg->properties);
break;
}
case DATA:
case AMQP_SEQUENCE: {
msg->inferred = true;
pn_data_clear(msg->body);
pn_amqp_decode_DqC(msg_bytes, msg->body);
pn_data_rewind(msg->body);
break;
}
case AMQP_VALUE: {
msg->inferred = false;
pn_data_clear(msg->body);
pn_amqp_decode_DqC(msg_bytes, msg->body);
pn_data_rewind(msg->body);
break;
}
case FOOTER:
break;
default: {
pn_data_clear(msg->body);
pn_data_decode(msg->body, msg_bytes.start, msg_bytes.size);
pn_data_rewind(msg->body);
break;
}
}
msg_bytes = (pn_bytes_t){.size=msg_bytes.size-section_size, .start=msg_bytes.start+section_size};
}
return 0;
}