int pn_do_attach()

in c/src/core/transport.c [1182:1320]


int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
  pn_bytes_t name;
  uint32_t handle;
  bool is_sender;
  pn_bytes_t source, target;
  pn_durability_t src_dr, tgt_dr;
  pn_bytes_t src_exp, tgt_exp;
  pn_seconds_t src_timeout, tgt_timeout;
  bool src_dynamic, tgt_dynamic;
  pn_sequence_t idc;
  pn_bytes_t dist_mode;
  bool snd_settle, rcv_settle;
  uint8_t snd_settle_mode, rcv_settle_mode;
  uint64_t max_msgsz;
  bool has_props;
  pn_bytes_t rem_props = (pn_bytes_t){0, NULL};
  pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQRe(payload,
                                                         &name, &handle,
                                                         &is_sender,
                                                         &snd_settle, &snd_settle_mode,
                                                         &rcv_settle, &rcv_settle_mode,
                                                         &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
                                                         &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
                                                         &idc, &max_msgsz, &has_props, &rem_props);
  pn_session_t *ssn = pni_channel_state(transport, channel);
  if (!ssn) {
    pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
    return PN_EOS;
  }
  if (handle > ssn->local_handle_max) {
    pn_do_error(transport, "amqp:connection:framing-error",
                "remote handle %u is above handle_max %u", handle, ssn->local_handle_max);
    return PN_EOS;
  }
  pn_link_t *link = pni_find_link(ssn, name, is_sender);
  if (link && (int32_t)link->state.remote_handle >= 0) {
    pn_do_error(transport, "amqp:invalid-field", "link name already attached: %.*s", (int)name.size, name.start);
    return PN_EOS;
  }
  if (!link) {                  /* Make a new link for the attach */
    if (is_sender) {
      link = (pn_link_t *) pn_link_new(SENDER, ssn, pn_stringn(name.start, name.size));
    } else {
      link = (pn_link_t *) pn_link_new(RECEIVER, ssn, pn_stringn(name.start, name.size));
    }
  }

  if (has_props) {
    pn_bytes_free(link->remote_properties_raw);
    link->remote_properties_raw = pn_bytes_dup(rem_props);
  }

  pni_map_remote_handle(link, handle);
  PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
  pn_terminus_t *rsrc = &link->remote_source;
  if (source.start || src_dynamic) {
    pn_terminus_set_type(rsrc, PN_SOURCE);
    pn_terminus_set_address_bytes(rsrc, source);
    pn_terminus_set_durability(rsrc, src_dr);
    set_expiry_policy_from_symbol(rsrc, src_exp);
    pn_terminus_set_timeout(rsrc, src_timeout);
    pn_terminus_set_dynamic(rsrc, src_dynamic);
    pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
  } else {
    pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
  }
  pn_terminus_t *rtgt = &link->remote_target;
  if (target.start || tgt_dynamic) {
    pn_terminus_set_type(rtgt, PN_TARGET);
    pn_terminus_set_address_bytes(rtgt, target);
    pn_terminus_set_durability(rtgt, tgt_dr);
    set_expiry_policy_from_symbol(rtgt, tgt_exp);
    pn_terminus_set_timeout(rtgt, tgt_timeout);
    pn_terminus_set_dynamic(rtgt, tgt_dynamic);
  } else {
    uint64_t code = 0;
    pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code);
    if (code == AMQP_DESC_COORDINATOR) {
      pn_terminus_set_type(rtgt, PN_COORDINATOR);
    } else if (code == AMQP_DESC_TARGET) {
      pn_terminus_set_type(rtgt, PN_TARGET);
    } else {
      pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
    }
  }

  if (snd_settle)
    link->remote_snd_settle_mode = snd_settle_mode;
  if (rcv_settle)
    link->remote_rcv_settle_mode = rcv_settle_mode;

  pn_bytes_t rem_src_properties = (pn_bytes_t) {0, NULL};
  pn_bytes_t rem_src_filter = (pn_bytes_t) {0, NULL};
  pn_bytes_t rem_src_outcomes = (pn_bytes_t) {0, NULL};
  pn_bytes_t rem_src_capabilities = (pn_bytes_t) {0, NULL};
  pn_amqp_decode_DqEqqqqqDqEqqqqqRqRqRRee(payload,
                                          &rem_src_properties,
                                          &rem_src_filter,
                                          &rem_src_outcomes,
                                          &rem_src_capabilities);

  pn_bytes_free(rsrc->properties_raw);
  rsrc->properties_raw = pn_bytes_dup(rem_src_properties);
  pn_bytes_free(rsrc->filter_raw);
  rsrc->filter_raw = pn_bytes_dup(rem_src_filter);
  pn_bytes_free(rsrc->outcomes_raw);
  rsrc->outcomes_raw = pn_bytes_dup(rem_src_outcomes);
  pn_bytes_free(rsrc->capabilities_raw);
  rsrc->capabilities_raw = pn_bytes_dup(rem_src_capabilities);

  pn_bytes_t rem_tgt_properties = (pn_bytes_t) {0, NULL};
  pn_bytes_t rem_tgt_capabilities = (pn_bytes_t) {0, NULL};
  if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) {
    // coordinator target only has a capabilities field
    pn_amqp_decode_DqEqqqqqDqqDqEReqqqe(payload,
                                        &rem_tgt_capabilities);
  } else {
    pn_amqp_decode_DqEqqqqqDqqDqEqqqqqRRee(payload,
                                           &rem_tgt_properties,
                                           &rem_tgt_capabilities);
  }

  pn_bytes_free(rtgt->properties_raw);
  rtgt->properties_raw = pn_bytes_dup(rem_tgt_properties);
  pn_bytes_free(rtgt->capabilities_raw);
  rtgt->capabilities_raw = pn_bytes_dup(rem_tgt_capabilities);

  if (!is_sender) {
    link->state.delivery_count = idc;
  }

  if (max_msgsz) {
    link->remote_max_message_size = max_msgsz;
  }

  pn_collector_put_object(transport->connection->collector, link, PN_LINK_REMOTE_OPEN);
  return 0;
}