static int pni_process_tpwork_sender()

in c/src/core/transport.c [2220:2287]


static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle)
{
  pn_link_t *link = delivery->link;
  pn_delivery_state_t *state = &delivery->state;
  if (delivery->aborted && !delivery->state.sending) {
    // Aborted delivery with no data yet sent, drop it and issue a FLOW as we may have credit.
    *settle = true;
    state->sent = true;
    pn_collector_put_object(transport->connection->collector, link, PN_LINK_FLOW);
    return 0;
  }
  *settle = false;
  pn_session_state_t *ssn_state = &link->session->state;
  pn_link_state_t *link_state = &link->state;
  bool xfr_posted = false;
  if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
    if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
        ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) {
      if (!state->init) {
        state = pni_delivery_map_push(&ssn_state->outgoing, delivery);
      }

      pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes);
      size_t full_size = bytes.size;
      int count = pni_post_amqp_transfer_frame(transport,
                                               ssn_state->local_channel,
                                               link_state->local_handle,
                                               state->id, &bytes, delivery->tag,
                                               0, // message-format
                                               delivery->local.settled,
                                               !delivery->done,
                                               ssn_state->remote_incoming_window,
                                               &delivery->local,
                                               false, /* Resume */
                                               delivery->aborted,
                                               false /* Batchable */
      );
      if (count < 0) return count;
      state->sending = true;
      xfr_posted = true;
      ssn_state->outgoing_transfer_count += count;
      ssn_state->remote_incoming_window -= count;

      int sent = full_size - bytes.size;
      pn_buffer_trim(delivery->bytes, sent, 0);
      link->session->outgoing_bytes -= sent;
      if (!pn_buffer_size(delivery->bytes) && delivery->done) {
        state->sent = true;
        link_state->delivery_count++;
        link_state->link_credit--;
        link->queued--;
        link->session->outgoing_deliveries--;
      }

      pn_collector_put_object(transport->connection->collector, link, PN_LINK_FLOW);
    }
  }

  if (!state->init) state = NULL;
  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled
      && state && state->sent && !xfr_posted) {
    int err = pni_post_disp(transport, delivery);
    if (err) return err;
  }

  *settle = delivery->local.settled && state && state->sent;
  return 0;
}