in src/router_core/connections.c [311:488]
int qdr_connection_process(qdr_connection_t *conn)
{
qdr_connection_work_list_t work_list;
qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES];
qdr_core_t *core = conn->core;
qdr_link_ref_t *ref;
qdr_link_t *link;
bool detach_sent;
int event_count = 0;
if (conn->closed) {
conn->protocol_adaptor->conn_close_handler(conn->protocol_adaptor->user_context, conn, conn->error);
return 0;
}
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(conn->work_list, work_list);
for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) {
DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
//
// Move the references from CLASS_WORK to CLASS_LOCAL so concurrent action in the core
// thread doesn't assume these links are referenced from the connection's list.
//
ref = DEQ_HEAD(links_with_work[priority]);
while (ref) {
move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, QDR_LINK_LIST_CLASS_LOCAL);
ref->link->processing = true;
ref = DEQ_NEXT(ref);
}
}
sys_mutex_unlock(conn->work_lock);
event_count += DEQ_SIZE(work_list);
qdr_connection_work_t *work = DEQ_HEAD(work_list);
while (work) {
DEQ_REMOVE_HEAD(work_list);
switch (work->work_type) {
case QDR_CONNECTION_WORK_FIRST_ATTACH :
conn->protocol_adaptor->first_attach_handler(conn->protocol_adaptor->user_context, conn, work->link, work->source, work->target, work->ssn_class);
break;
case QDR_CONNECTION_WORK_SECOND_ATTACH :
conn->protocol_adaptor->second_attach_handler(conn->protocol_adaptor->user_context, work->link, work->source, work->target);
break;
case QDR_CONNECTION_WORK_TRACING_ON :
conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, true);
break;
case QDR_CONNECTION_WORK_TRACING_OFF :
conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, false);
break;
}
qdr_connection_work_free_CT(work);
work = DEQ_HEAD(work_list);
}
// Process the links_with_work array from highest to lowest priority.
for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
ref = DEQ_HEAD(links_with_work[priority]);
while (ref) {
qdr_link_work_t *link_work;
detach_sent = false;
link = ref->link;
//
// The work lock must be used to protect accesses to the link's work_list and
// link_work->processing.
//
sys_mutex_lock(conn->work_lock);
link_work = DEQ_HEAD(link->work_list);
if (link_work) {
// link_work ref transfered to local link_work
DEQ_REMOVE_HEAD(link->work_list);
link_work->processing = true;
}
sys_mutex_unlock(conn->work_lock);
//
// Handle disposition/settlement updates
//
qdr_delivery_ref_list_t updated_deliveries;
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(link->updated_deliveries, updated_deliveries);
sys_mutex_unlock(conn->work_lock);
qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
while (dref) {
conn->protocol_adaptor->delivery_update_handler(conn->protocol_adaptor->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
qdr_del_delivery_ref(&updated_deliveries, dref);
dref = DEQ_HEAD(updated_deliveries);
event_count++;
}
while (link_work) {
switch (link_work->work_type) {
case QDR_LINK_WORK_DELIVERY :
{
int count = conn->protocol_adaptor->push_handler(conn->protocol_adaptor->user_context, link, link_work->value);
assert(count <= link_work->value);
link_work->value -= count;
break;
}
case QDR_LINK_WORK_FLOW :
if (link_work->value > 0)
conn->protocol_adaptor->flow_handler(conn->protocol_adaptor->user_context, link, link_work->value);
if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, true);
else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, false);
else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
conn->protocol_adaptor->drained_handler(conn->protocol_adaptor->user_context, link);
break;
case QDR_LINK_WORK_FIRST_DETACH :
case QDR_LINK_WORK_SECOND_DETACH :
conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, link, link_work->error,
link_work->work_type == QDR_LINK_WORK_FIRST_DETACH,
link_work->close_link);
detach_sent = true;
break;
}
sys_mutex_lock(conn->work_lock);
if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
// link_work ref transfered from link_work to work_list
DEQ_INSERT_HEAD(link->work_list, link_work);
link_work->processing = false;
link_work = 0; // Halt work processing
} else {
qdr_link_work_release(link_work);
link_work = DEQ_HEAD(link->work_list);
if (link_work) {
// link_work ref transfered to local link_work
DEQ_REMOVE_HEAD(link->work_list);
link_work->processing = true;
}
}
sys_mutex_unlock(conn->work_lock);
event_count++;
}
if (detach_sent) {
// let the core thread know so it can clean up
qdr_link_detach_sent(link);
} else
qdr_record_link_credit(core, link);
ref = DEQ_NEXT(ref);
}
}
sys_mutex_lock(conn->work_lock);
for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
ref = DEQ_HEAD(links_with_work[priority]);
while (ref) {
qdr_link_t *link = ref->link;
link->processing = false;
if (link->ready_to_free)
qdr_link_processing_complete(core, link);
qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL);
ref = DEQ_HEAD(links_with_work[priority]);
}
}
sys_mutex_unlock(conn->work_lock);
return event_count;
}