in c/src/core/transport.c [2220:2287]
static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle)
{
pn_link_t *link = delivery->link;
pn_delivery_state_t *state = &delivery->state;
if (delivery->aborted && !delivery->state.sending) {
// Aborted delivery with no data yet sent, drop it and issue a FLOW as we may have credit.
*settle = true;
state->sent = true;
pn_collector_put_object(transport->connection->collector, link, PN_LINK_FLOW);
return 0;
}
*settle = false;
pn_session_state_t *ssn_state = &link->session->state;
pn_link_state_t *link_state = &link->state;
bool xfr_posted = false;
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) {
if (!state->init) {
state = pni_delivery_map_push(&ssn_state->outgoing, delivery);
}
pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
size_t full_size = bytes.size;
int count = pni_post_amqp_transfer_frame(transport,
ssn_state->local_channel,
link_state->local_handle,
state->id, &bytes, delivery->tag,
0, // message-format
delivery->local.settled,
!delivery->done,
ssn_state->remote_incoming_window,
&delivery->local,
false, /* Resume */
delivery->aborted,
false /* Batchable */
);
if (count < 0) return count;
state->sending = true;
xfr_posted = true;
ssn_state->outgoing_transfer_count += count;
ssn_state->remote_incoming_window -= count;
int sent = full_size - bytes.size;
pn_buffer_trim(delivery->bytes, sent, 0);
link->session->outgoing_bytes -= sent;
if (!pn_buffer_size(delivery->bytes) && delivery->done) {
state->sent = true;
link_state->delivery_count++;
link_state->link_credit--;
link->queued--;
link->session->outgoing_deliveries--;
}
pn_collector_put_object(transport->connection->collector, link, PN_LINK_FLOW);
}
}
if (!state->init) state = NULL;
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled
&& state && state->sent && !xfr_posted) {
int err = pni_post_disp(transport, delivery);
if (err) return err;
}
*settle = delivery->local.settled && state && state->sent;
return 0;
}