in c/src/core/transport.c [1336:1471]
int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
// XXX: multi transfer
uint32_t handle;
pn_bytes_t tag;
bool id_present;
pn_sequence_t id;
bool settled;
bool more;
bool has_type, settled_set;
bool resume, aborted, batchable;
uint64_t type;
pn_bytes_t disp_data;
size_t dsize =
pn_amqp_decode_DqEIQIzqQooqDQLRoooe(payload, &handle, &id_present, &id, &tag,
&settled_set, &settled, &more, &has_type, &type, &disp_data,
&resume, &aborted, &batchable);
payload.size -= dsize;
payload.start += dsize;
pn_session_t *ssn = pni_channel_state(transport, channel);
if (!ssn) {
return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
}
if (!ssn->state.incoming_window) {
return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
}
pn_link_t *link = pni_handle_state(ssn, handle);
if (!link) {
return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
}
pn_delivery_t *delivery = NULL;
bool new_delivery = false;
if (link->more_pending) {
// Ongoing multiframe delivery.
if (link->unsettled_tail && !link->unsettled_tail->done) {
delivery = link->unsettled_tail;
if (settled_set && !settled && delivery->remote.settled)
return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
if (id_present && id != delivery->state.id)
return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer");
} else {
// Application has already settled. Delivery is no more.
// Ignore content and look for transition to a new delivery.
if (!id_present || id == link->more_id) {
// Still old delivery.
if (!more || aborted)
link->more_pending = false;
} else {
// New id.
new_delivery = true;
link->more_pending = false;
}
}
} else {
new_delivery = true;
}
if (new_delivery) {
assert(!link->more_pending);
assert(delivery == NULL);
pn_delivery_map_t *incoming = &ssn->state.incoming;
if (!ssn->state.incoming_init) {
if (!id_present) {
return pn_do_error(transport, "amqp:invalid-field", "delivery-id required on initial transfer of session");
}
incoming->next = id;
ssn->state.incoming_init = true;
ssn->incoming_deliveries++;
}
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
pn_delivery_state_t *state = pni_delivery_map_push(incoming, delivery);
if (id_present && id != state->id) {
return pn_do_error(transport, "amqp:session:invalid-field",
"sequencing error, expected delivery-id %u, got %u",
state->id, id);
}
if (has_type) {
pni_amqp_decode_disposition(type, disp_data, &delivery->remote);
}
link->state.delivery_count++;
link->state.link_credit--;
link->queued++;
}
if (delivery) {
pn_buffer_append(delivery->bytes, payload.start, payload.size);
if (more) {
if (!link->more_pending) {
if (!id_present) {
return pn_do_error(transport, "amqp:invalid-field", "delivery-id required for transfer");
}
// First frame of a multi-frame transfer. Remember at link level.
link->more_pending = true;
link->more_id = id;
}
delivery->done = false;
}
else
delivery->done = true;
// XXX: need to fill in remote state: delivery->remote.state = ...;
if (settled && !delivery->remote.settled) {
delivery->remote.settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
}
if ((delivery->aborted = aborted)) {
delivery->remote.settled = true;
delivery->done = true;
delivery->updated = true;
link->more_pending = false;
pn_work_update(transport->connection, delivery);
}
pn_collector_put_object(transport->connection->collector, delivery, PN_DELIVERY);
}
ssn->incoming_bytes += payload.size;
ssn->state.incoming_transfer_count++;
ssn->state.incoming_window--;
if ((int32_t) link->state.local_handle >= 0 && ssn->state.incoming_window < ssn->incoming_window_lwm) {
if (!ssn->check_flow) {
ssn->check_flow = true;
pn_modified(ssn->connection, &link->endpoint, false);
}
}
return 0;
}