in src/server.c [1011:1120]
static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_conn, qd_connection_t *ctx)
{
if (pn_conn && qdr_is_authentication_service_connection(pn_conn)) {
qdr_handle_authentication_service_connection_event(e);
return true;
}
switch (pn_event_type(e)) {
case PN_PROACTOR_INTERRUPT:
/* Interrupt the next thread */
pn_proactor_interrupt(qd_server->proactor);
/* Stop the current thread */
return false;
case PN_PROACTOR_TIMEOUT:
qd_timer_visit();
break;
case PN_LISTENER_OPEN:
case PN_LISTENER_ACCEPT:
case PN_LISTENER_CLOSE:
do_handle_listener(e, qd_server);
break;
case PN_CONNECTION_INIT: {
const qd_server_config_t *config = ctx && ctx->listener ? &ctx->listener->config : 0;
if (config && config->initial_handshake_timeout_seconds > 0) {
ctx->timer = qd_timer(qd_server->qd, startup_timer_handler, ctx);
qd_timer_schedule(ctx->timer, config->initial_handshake_timeout_seconds * 1000);
}
break;
}
case PN_CONNECTION_BOUND:
on_connection_bound(qd_server, e);
break;
case PN_CONNECTION_REMOTE_OPEN:
// If we are transitioning to the open state, notify the client via callback.
if (ctx && ctx->timer) {
qd_timer_free(ctx->timer);
ctx->timer = 0;
}
if (ctx && !ctx->opened) {
ctx->opened = true;
if (ctx->connector) {
ctx->connector->delay = 2000; // Delay re-connect in case there is a recurring error
qd_failover_item_t *item = qd_connector_get_conn_info(ctx->connector);
if (item)
item->retries = 0;
}
}
break;
case PN_CONNECTION_WAKE:
invoke_deferred_calls(ctx, false);
break;
case PN_TRANSPORT_ERROR:
{
pn_transport_t *transport = pn_event_transport(e);
pn_condition_t* condition = transport ? pn_transport_condition(transport) : NULL;
if (ctx && ctx->connector) { /* Outgoing connection */
qd_increment_conn_index(ctx);
const qd_server_config_t *config = &ctx->connector->config;
// note: will transition back to STATE_CONNECTING when ctx is freed (pn_connection_free)
ctx->connector->state = CXTR_STATE_FAILED;
char conn_msg[300];
if (condition && pn_condition_is_set(condition)) {
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed: %s %s", ctx->connection_id, config->host_port,
pn_condition_get_name(condition), pn_condition_get_description(condition));
strcpy(ctx->connector->conn_msg, conn_msg);
qd_log(qd_server->log_source, QD_LOG_INFO, "%s", conn_msg);
} else {
qd_format_string(conn_msg, 300, "[C%"PRIu64"] Connection to %s failed", ctx->connection_id, config->host_port);
strcpy(ctx->connector->conn_msg, conn_msg);
qd_log(qd_server->log_source, QD_LOG_INFO, "%s", conn_msg);
}
} else if (ctx && ctx->listener) { /* Incoming connection */
if (condition && pn_condition_is_set(condition)) {
qd_log(ctx->server->log_source, QD_LOG_INFO, "[C%"PRIu64"] Connection from %s (to %s) failed: %s %s",
ctx->connection_id, ctx->rhost_port, ctx->listener->config.host_port, pn_condition_get_name(condition),
pn_condition_get_description(condition));
}
}
}
break;
case PN_RAW_CONNECTION_CONNECTED:
case PN_RAW_CONNECTION_CLOSED_READ:
case PN_RAW_CONNECTION_CLOSED_WRITE:
case PN_RAW_CONNECTION_DISCONNECTED:
case PN_RAW_CONNECTION_NEED_READ_BUFFERS:
case PN_RAW_CONNECTION_NEED_WRITE_BUFFERS:
case PN_RAW_CONNECTION_READ:
case PN_RAW_CONNECTION_WRITTEN:
case PN_RAW_CONNECTION_WAKE:
do_handle_raw_connection_event(e, qd_server);
break;
default:
break;
} // Switch event type
if (ctx)
qd_container_handle_event(qd_server->container, e, pn_conn, ctx);
return true;
}