in src/router_node.c [1193:1396]
static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound)
{
qdr_connection_role_t role = 0;
int cost = 1;
int link_capacity = 1;
const char *name = 0;
bool multi_tenant = false;
bool streaming_links = false;
const char *vhost = 0;
char rversion[128];
uint64_t connection_id = qd_connection_connection_id(conn);
pn_connection_t *pn_conn = qd_connection_pn(conn);
pn_transport_t *tport = 0;
pn_sasl_t *sasl = 0;
pn_ssl_t *ssl = 0;
const char *mech = 0;
const char *user = 0;
const char *container = conn->pn_conn ? pn_connection_remote_container(conn->pn_conn) : 0;
rversion[0] = 0;
conn->strip_annotations_in = false;
conn->strip_annotations_out = false;
if (conn->pn_conn) {
tport = pn_connection_transport(conn->pn_conn);
ssl = conn->ssl;
}
if (tport) {
sasl = pn_sasl(tport);
if(conn->user_id)
user = conn->user_id;
else
user = pn_transport_get_user(tport);
}
if (sasl)
mech = pn_sasl_get_mech(sasl);
const char *host = 0;
char host_local[255];
const qd_server_config_t *config;
if (qd_connection_connector(conn)) {
config = qd_connector_config(qd_connection_connector(conn));
snprintf(host_local, 254, "%s", config->host_port);
host = &host_local[0];
}
else
host = qd_connection_name(conn);
qd_router_connection_get_config(conn, &role, &cost, &name, &multi_tenant,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);
// check offered capabilities for streaming link support
//
pn_data_t *ocaps = pn_connection_remote_offered_capabilities(pn_conn);
if (ocaps) {
size_t sl_len = strlen(QD_CAPABILITY_STREAMING_LINKS);
pn_data_rewind(ocaps);
if (pn_data_next(ocaps)) {
if (pn_data_type(ocaps) == PN_ARRAY) {
pn_data_enter(ocaps);
pn_data_next(ocaps);
}
do {
if (pn_data_type(ocaps) == PN_SYMBOL) {
pn_bytes_t s = pn_data_get_symbol(ocaps);
streaming_links = (s.size == sl_len
&& strncmp(s.start, QD_CAPABILITY_STREAMING_LINKS, sl_len) == 0);
}
} while (pn_data_next(ocaps) && !streaming_links);
}
}
// if connection properties are present parse out any important data
//
pn_data_t *props = pn_conn ? pn_connection_remote_properties(pn_conn) : 0;
if (props) {
const bool is_router = (role == QDR_ROLE_INTER_ROUTER || role == QDR_ROLE_EDGE_CONNECTION);
pn_data_rewind(props);
if (pn_data_next(props) && pn_data_type(props) == PN_MAP) {
const size_t num_items = pn_data_get_map(props);
int props_found = 0; // once all props found exit loop
pn_data_enter(props);
for (int i = 0; i < num_items / 2 && props_found < 4; ++i) {
if (!pn_data_next(props)) break;
if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map
pn_bytes_t key = pn_data_get_symbol(props);
if (key.size == strlen(QD_CONNECTION_PROPERTY_COST_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_COST_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
if (is_router) {
if (pn_data_type(props) == PN_INT) {
const int remote_cost = (int) pn_data_get_int(props);
if (remote_cost > cost)
cost = remote_cost;
}
}
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY) &&
strncmp(key.start, QD_CONNECTION_PROPERTY_FAILOVER_LIST_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
parse_failover_property_list(router, conn, props);
} else if (key.size == strlen(QD_CONNECTION_PROPERTY_VERSION_KEY)
&& strncmp(key.start, QD_CONNECTION_PROPERTY_VERSION_KEY, key.size) == 0) {
props_found += 1;
if (!pn_data_next(props)) break;
if (is_router) {
pn_bytes_t vdata = pn_data_get_string(props);
size_t vlen = MIN(sizeof(rversion) - 1, vdata.size);
strncpy(rversion, vdata.start, vlen);
rversion[vlen] = 0;
}
} else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY)
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY, key.size) == 0)) {
props_found += 1;
if (!pn_data_next(props)) break;
if (is_router && pn_data_type(props) == PN_INT) {
const int annos_version = (int) pn_data_get_int(props);
qd_log(router->log_source, QD_LOG_DEBUG,
"Remote router annotations version: %d", annos_version);
}
} else {
// skip this key
if (!pn_data_next(props)) break;
}
}
}
}
if (multi_tenant)
vhost = (conn->policy_settings && conn->policy_settings->vhost_name) ?
conn->policy_settings->vhost_name :
pn_connection_remote_hostname(pn_conn);
char proto[50];
memset(proto, 0, 50);
char cipher[50];
memset(cipher, 0, 50);
int ssl_ssf = 0;
bool is_ssl = false;
if (ssl) {
pn_ssl_get_protocol_name(ssl, proto, 50);
pn_ssl_get_cipher_name(ssl, cipher, 50);
ssl_ssf = pn_ssl_get_ssf(ssl);
is_ssl = true;
}
bool encrypted = tport && pn_transport_is_encrypted(tport);
bool authenticated = tport && pn_transport_is_authenticated(tport);
qdr_connection_info_t *connection_info = qdr_connection_info(encrypted,
authenticated,
conn->opened,
(char*) mech,
conn->connector ? QD_OUTGOING : QD_INCOMING,
host,
proto,
cipher,
(char*) user,
container,
props,
ssl_ssf,
is_ssl,
rversion,
streaming_links);
qdr_connection_opened(router->router_core,
amqp_direct_adaptor,
inbound,
role,
cost,
connection_id,
name,
pn_connection_remote_container(pn_conn),
conn->strip_annotations_in,
conn->strip_annotations_out,
link_capacity,
vhost,
!!conn->policy_settings ? &conn->policy_settings->spec : 0,
connection_info,
bind_connection_context,
conn);
if (conn->connector) {
char conn_msg[300];
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection Opened: dir=%s host=%s vhost=%s encrypted=%s"
" auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, vhost ? vhost : "", encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
sys_mutex_lock(conn->connector->lock);
strcpy(conn->connector->conn_msg, conn_msg);
sys_mutex_unlock(conn->connector->lock);
}
}