int pn_do_transfer()

in c/src/core/transport.c [1336:1471]


int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
  // XXX: multi transfer
  uint32_t handle;
  pn_bytes_t tag;
  bool id_present;
  pn_sequence_t id;
  bool settled;
  bool more;
  bool has_type, settled_set;
  bool resume, aborted, batchable;
  uint64_t type;

  pn_bytes_t disp_data;
  size_t dsize =
    pn_amqp_decode_DqEIQIzqQooqDQLRoooe(payload, &handle, &id_present, &id, &tag,
                                        &settled_set, &settled, &more, &has_type, &type, &disp_data,
                                        &resume, &aborted, &batchable);
  payload.size -= dsize;
  payload.start += dsize;

  pn_session_t *ssn = pni_channel_state(transport, channel);
  if (!ssn) {
    return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
  }

  if (!ssn->state.incoming_window) {
    return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded");
  }

  pn_link_t *link = pni_handle_state(ssn, handle);
  if (!link) {
    return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle);
  }
  pn_delivery_t *delivery = NULL;
  bool new_delivery = false;
  if (link->more_pending) {
    // Ongoing multiframe delivery.
    if (link->unsettled_tail && !link->unsettled_tail->done) {
      delivery = link->unsettled_tail;
      if (settled_set && !settled && delivery->remote.settled)
        return pn_do_error(transport, "amqp:invalid-field", "invalid transition from settled to unsettled");
      if (id_present && id != delivery->state.id)
        return pn_do_error(transport, "amqp:invalid-field", "invalid delivery-id for a continuation transfer");
    } else {
      // Application has already settled.  Delivery is no more.
      // Ignore content and look for transition to a new delivery.
      if (!id_present || id == link->more_id) {
        // Still old delivery.
        if (!more || aborted)
          link->more_pending = false;
      } else {
        // New id.
        new_delivery = true;
        link->more_pending = false;
      }
    }
  } else {
    new_delivery = true;
  }

  if (new_delivery) {
    assert(!link->more_pending);
    assert(delivery == NULL);
    pn_delivery_map_t *incoming = &ssn->state.incoming;

    if (!ssn->state.incoming_init) {
      if (!id_present) {
        return pn_do_error(transport, "amqp:invalid-field", "delivery-id required on initial transfer of session");
      }
      incoming->next = id;
      ssn->state.incoming_init = true;
      ssn->incoming_deliveries++;
    }

    delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
    pn_delivery_state_t *state = pni_delivery_map_push(incoming, delivery);
    if (id_present && id != state->id) {
      return pn_do_error(transport, "amqp:session:invalid-field",
                         "sequencing error, expected delivery-id %u, got %u",
                         state->id, id);
    }
    if (has_type) {
      pni_amqp_decode_disposition(type, disp_data, &delivery->remote);
    }
    link->state.delivery_count++;
    link->state.link_credit--;
    link->queued++;
  }

  if (delivery) {
    pn_buffer_append(delivery->bytes, payload.start, payload.size);
    if (more) {
      if (!link->more_pending) {
        if (!id_present) {
          return pn_do_error(transport, "amqp:invalid-field", "delivery-id required for transfer");
        }
        // First frame of a multi-frame transfer. Remember at link level.
        link->more_pending = true;
        link->more_id = id;
      }
      delivery->done = false;
    }
    else
      delivery->done = true;

    // XXX: need to fill in remote state: delivery->remote.state = ...;
    if (settled && !delivery->remote.settled) {
      delivery->remote.settled = settled;
      delivery->updated = true;
      pn_work_update(transport->connection, delivery);
    }

    if ((delivery->aborted = aborted)) {
      delivery->remote.settled = true;
      delivery->done = true;
      delivery->updated = true;
      link->more_pending = false;
      pn_work_update(transport->connection, delivery);
    }
    pn_collector_put_object(transport->connection->collector, delivery, PN_DELIVERY);
  }

  ssn->incoming_bytes += payload.size;
  ssn->state.incoming_transfer_count++;
  ssn->state.incoming_window--;

  if ((int32_t) link->state.local_handle >= 0 && ssn->state.incoming_window < ssn->incoming_window_lwm) {
    if (!ssn->check_flow) {
      ssn->check_flow = true;
      pn_modified(ssn->connection, &link->endpoint, false);
    }
  }

  return 0;
}