int pn_do_attach()

in c/src/core/transport.c [1215:1366]


int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
  pn_bytes_t name;
  uint32_t handle;
  bool is_sender;
  pn_bytes_t source, target;
  pn_durability_t src_dr, tgt_dr;
  pn_bytes_t src_exp, tgt_exp;
  pn_seconds_t src_timeout, tgt_timeout;
  bool src_dynamic, tgt_dynamic;
  pn_sequence_t idc;
  pn_bytes_t dist_mode;
  bool snd_settle, rcv_settle;
  uint8_t snd_settle_mode, rcv_settle_mode;
  uint64_t max_msgsz;
  bool has_props;
  pn_data_t *rem_props = pn_data(0);
  pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQCe(payload,
                                                         &name, &handle,
                                                         &is_sender,
                                                         &snd_settle, &snd_settle_mode,
                                                         &rcv_settle, &rcv_settle_mode,
                                                         &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
                                                         &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
                                                         &idc, &max_msgsz, &has_props, rem_props);
  char strbuf[128];      // avoid malloc for most link names
  char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL;
  char *strname = strheap ? strheap : strbuf;
  if (name.size > 0) strncpy(strname, name.start, name.size);
  strname[name.size] = '\0';

  pn_session_t *ssn = pni_channel_state(transport, channel);
  if (!ssn) {
    pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
    if (strheap) free(strheap);
    pn_free(rem_props);
    return PN_EOS;
  }
  if (handle > ssn->local_handle_max) {
    pn_do_error(transport, "amqp:connection:framing-error",
                "remote handle %u is above handle_max %u", handle, ssn->local_handle_max);
    if (strheap) free(strheap);
    pn_free(rem_props);
    return PN_EOS;
  }
  pn_link_t *link = pni_find_link(ssn, name, is_sender);
  if (link && (int32_t)link->state.remote_handle >= 0) {
    pn_do_error(transport, "amqp:invalid-field", "link name already attached: %s", strname);
    if (strheap) free(strheap);
    pn_free(rem_props);
    return PN_EOS;
  }
  if (!link) {                  /* Make a new link for the attach */
    if (is_sender) {
      link = (pn_link_t *) pn_sender(ssn, strname);
    } else {
      link = (pn_link_t *) pn_receiver(ssn, strname);
    }
  }

  if (strheap) {
    free(strheap);
  }

  if (has_props) {
    link->remote_properties = rem_props;
  } else {
    pn_free(rem_props);
  }

  pni_map_remote_handle(link, handle);
  PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
  pn_terminus_t *rsrc = &link->remote_source;
  if (source.start || src_dynamic) {
    pn_terminus_set_type(rsrc, PN_SOURCE);
    pn_terminus_set_address_bytes(rsrc, source);
    pn_terminus_set_durability(rsrc, src_dr);
    set_expiry_policy_from_symbol(rsrc, src_exp);
    pn_terminus_set_timeout(rsrc, src_timeout);
    pn_terminus_set_dynamic(rsrc, src_dynamic);
    pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
  } else {
    pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
  }
  pn_terminus_t *rtgt = &link->remote_target;
  if (target.start || tgt_dynamic) {
    pn_terminus_set_type(rtgt, PN_TARGET);
    pn_terminus_set_address_bytes(rtgt, target);
    pn_terminus_set_durability(rtgt, tgt_dr);
    set_expiry_policy_from_symbol(rtgt, tgt_exp);
    pn_terminus_set_timeout(rtgt, tgt_timeout);
    pn_terminus_set_dynamic(rtgt, tgt_dynamic);
  } else {
    uint64_t code = 0;
    pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code);
    if (code == COORDINATOR) {
      pn_terminus_set_type(rtgt, PN_COORDINATOR);
    } else if (code == TARGET) {
      pn_terminus_set_type(rtgt, PN_TARGET);
    } else {
      pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
    }
  }

  if (snd_settle)
    link->remote_snd_settle_mode = snd_settle_mode;
  if (rcv_settle)
    link->remote_rcv_settle_mode = rcv_settle_mode;

  pn_data_clear(link->remote_source.properties);
  pn_data_clear(link->remote_source.filter);
  pn_data_clear(link->remote_source.outcomes);
  pn_data_clear(link->remote_source.capabilities);

  pn_amqp_decode_DqEqqqqqDqEqqqqqCqCqCCee(payload,
                                          link->remote_source.properties,
                                          link->remote_source.filter,
                                          link->remote_source.outcomes,
                                          link->remote_source.capabilities);

  pn_data_rewind(link->remote_source.properties);
  pn_data_rewind(link->remote_source.filter);
  pn_data_rewind(link->remote_source.outcomes);
  pn_data_rewind(link->remote_source.capabilities);

  pn_data_clear(link->remote_target.properties);
  pn_data_clear(link->remote_target.capabilities);

  if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) {
    // coordinator target only has a capabilities field
    pn_amqp_decode_DqEqqqqqDqqDqECeqqqe(payload,
                                        link->remote_target.capabilities);
  } else {
    pn_amqp_decode_DqEqqqqqDqqDqEqqqqqCCee(payload,
                                           link->remote_target.properties,
                                           link->remote_target.capabilities);
  }

  pn_data_rewind(link->remote_target.properties);
  pn_data_rewind(link->remote_target.capabilities);

  if (!is_sender) {
    link->state.delivery_count = idc;
  }

  if (max_msgsz) {
    link->remote_max_message_size = max_msgsz;
  }

  pn_collector_put_object(transport->connection->collector, link, PN_LINK_REMOTE_OPEN);
  return 0;
}