static void AMQP_opened_handler()

in src/router_node.c [1193:1396]


static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound)
{
    qdr_connection_role_t  role = 0;
    int                    cost = 1;
    int                    link_capacity = 1;
    const char            *name = 0;
    bool                   multi_tenant = false;
    bool                   streaming_links = false;
    const char            *vhost = 0;
    char                   rversion[128];
    uint64_t               connection_id = qd_connection_connection_id(conn);
    pn_connection_t       *pn_conn = qd_connection_pn(conn);
    pn_transport_t *tport = 0;
    pn_sasl_t      *sasl  = 0;
    pn_ssl_t       *ssl   = 0;
    const char     *mech  = 0;
    const char     *user  = 0;
    const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;

    rversion[0] = 0;
    conn->strip_annotations_in  = false;
    conn->strip_annotations_out = false;
    if (conn->pn_conn) {
        tport = pn_connection_transport(conn->pn_conn);
        ssl   = conn->ssl;
    }
    if (tport) {
        sasl = pn_sasl(tport);
        if(conn->user_id)
            user = conn->user_id;
        else
            user = pn_transport_get_user(tport);
    }

    if (sasl)
        mech = pn_sasl_get_mech(sasl);

    const char *host = 0;
    char host_local[255];
    const qd_server_config_t *config;
    if (qd_connection_connector(conn)) {
        config = qd_connector_config(qd_connection_connector(conn));
        snprintf(host_local, 254, "%s", config->host_port);
        host = &host_local[0];
    }
    else
        host = qd_connection_name(conn);


    qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant,
                                    &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);

    // check offered capabilities for streaming link support
    //
    pn_data_t *ocaps = pn_connection_remote_offered_capabilities(pn_conn);
    if (ocaps) {
        size_t sl_len = strlen(QD_CAPABILITY_STREAMING_LINKS);
        pn_data_rewind(ocaps);
        if (pn_data_next(ocaps)) {
            if (pn_data_type(ocaps) == PN_ARRAY) {
                pn_data_enter(ocaps);
                pn_data_next(ocaps);
            }
            do {
                if (pn_data_type(ocaps) == PN_SYMBOL) {
                    pn_bytes_t s = pn_data_get_symbol(ocaps);
                    streaming_links = (s.size == sl_len
                                       && strncmp(s.start, QD_CAPABILITY_STREAMING_LINKS, sl_len) == 0);
                }
            } while (pn_data_next(ocaps) && !streaming_links);
        }
    }

    // if connection properties are present parse out any important data
    //
    pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0;
    if (props) {
        const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
        pn_data_rewind(props);
        if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
            const size_t num_items = pn_data_get_map(props);
            int props_found = 0;  // once all props found exit loop
            pn_data_enter(props);
            for (int i = 0; i < num_items / 2 && props_found < 4; ++i) {
                if (!pn_data_next(props)) break;
                if (pn_data_type(props) != PN_SYMBOL) break;  // invalid properties map
                pn_bytes_t key = pn_data_get_symbol(props);

                if (key.size == strlen(QD_CONNECTION_PROPERTY_COST_KEY) &&
                    strncmp(key.start, QD_CONNECTION_PROPERTY_COST_KEY, key.size) == 0) {
                    props_found += 1;
                    if (!pn_data_next(props)) break;
                    if (is_router) {
                        if (pn_data_type(props) == PN_INT) {
                            const int remote_cost = (int) pn_data_get_int(props);
                            if (remote_cost > cost)
                                cost = remote_cost;
                        }
                    }

                } else if (key.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY) &&
                           strncmp(key.start, QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY, key.size) == 0) {
                    props_found += 1;
                    if (!pn_data_next(props)) break;
                    parse_failover_property_list(router, conn, props);

                } else if (key.size == strlen(QD_CONNECTION_PROPERTY_VERSION_KEY)
                           && strncmp(key.start, QD_CONNECTION_PROPERTY_VERSION_KEY, key.size) == 0) {
                    props_found += 1;
                    if (!pn_data_next(props)) break;
                    if (is_router) {
                        pn_bytes_t vdata = pn_data_get_string(props);
                        size_t vlen = MIN(sizeof(rversion) - 1, vdata.size);
                        strncpy(rversion, vdata.start, vlen);
                        rversion[vlen] = 0;
                    }

                } else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY)
                           && strncmp(key.start, QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY, key.size) == 0)) {
                    props_found += 1;
                    if (!pn_data_next(props)) break;
                    if (is_router && pn_data_type(props) == PN_INT) {
                        const int annos_version = (int) pn_data_get_int(props);
                        qd_log(router->log_source, QD_LOG_DEBUG,
                               "Remote router annotations version: %d", annos_version);
                    }

                } else {
                    // skip this key
                    if (!pn_data_next(props)) break;
                }
            }
        }
    }


    if (multi_tenant)
        vhost = (conn->policy_settings && conn->policy_settings->vhost_name) ?
                conn->policy_settings->vhost_name :
                pn_connection_remote_hostname(pn_conn);

    char proto[50];
    memset(proto, 0, 50);
    char cipher[50];
    memset(cipher, 0, 50);

    int ssl_ssf = 0;
    bool is_ssl = false;

    if (ssl) {
        pn_ssl_get_protocol_name(ssl, proto, 50);
        pn_ssl_get_cipher_name(ssl, cipher, 50);
        ssl_ssf = pn_ssl_get_ssf(ssl);
        is_ssl = true;
    }


    bool encrypted     = tport && pn_transport_is_encrypted(tport);
    bool authenticated = tport && pn_transport_is_authenticated(tport);

    qdr_connection_info_t *connection_info = qdr_connection_info(encrypted,
                                                                 authenticated,
                                                                 conn->opened,
                                                                 (char*) mech,
                                                                 conn->connector ? QD_OUTGOING : QD_INCOMING,
                                                                 host,
                                                                 proto,
                                                                 cipher,
                                                                 (char*) user,
                                                                 container,
                                                                 props,
                                                                 ssl_ssf,
                                                                 is_ssl,
                                                                 rversion,
                                                                 streaming_links);

    qdr_connection_opened(router->router_core,
                          amqp_direct_adaptor,
                          inbound,
                          role,
                          cost,
                          connection_id,
                          name,
                          pn_connection_remote_container(pn_conn),
                          conn->strip_annotations_in,
                          conn->strip_annotations_out,
                          link_capacity,
                          vhost,
                          !!conn->policy_settings ? &conn->policy_settings->spec : 0,
                          connection_info,
                          bind_connection_context,
                          conn);

    if (conn->connector) {
        char conn_msg[300];
        qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
                " auth=%s user=%s container_id=%s",
                connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
                        authenticated ? mech : "no", (char*) user, container);
        sys_mutex_lock(conn->connector->lock);
        strcpy(conn->connector->conn_msg, conn_msg);
        sys_mutex_unlock(conn->connector->lock);
    }
}