in c/src/proactor/epoll.c [2845:2937]
void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
bool notify = false;
lock(&p->task.mutex);
// Move the whole tasks list into a disconnecting state
task_t *disconnecting_tasks = p->tasks;
p->tasks = NULL;
// First pass: mark each task as disconnecting and update global pending count.
task_t *tsk = disconnecting_tasks;
while (tsk) {
tsk->disconnecting = true;
tsk->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order.
p->disconnects_pending++;
tsk = tsk->next;
p->task_count--;
}
notify = schedule_if_inactive(p);
unlock(&p->task.mutex);
if (!disconnecting_tasks) {
if (notify) notify_poller(p);
return;
}
// Second pass: different locking, close the tasks, free them if !disconnect_ops
task_t *next = disconnecting_tasks;
while (next) {
tsk = next;
next = tsk->next; /* Save next pointer in case we free tsk */
bool do_free = false;
bool tsk_notify = false;
pmutex *tsk_mutex = NULL;
// TODO: Need to extend this for raw connections too
pconnection_t *pc = task_pconnection(tsk);
if (pc) {
tsk_mutex = &pc->task.mutex;
lock(tsk_mutex);
if (!tsk->closing) {
tsk_notify = true;
if (tsk->working) {
// Must defer
pc->queued_disconnect = true;
if (cond) {
if (!pc->disconnect_condition)
pc->disconnect_condition = pn_condition();
pn_condition_copy(pc->disconnect_condition, cond);
}
}
else {
// No conflicting working task.
if (cond) {
pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
}
pn_connection_driver_close(&pc->driver);
}
}
} else {
pn_listener_t *l = task_listener(tsk);
assert(l);
tsk_mutex = &l->task.mutex;
lock(tsk_mutex);
if (!tsk->closing) {
tsk_notify = true;
if (cond) {
pn_condition_copy(pn_listener_condition(l), cond);
}
listener_begin_close(l);
}
}
lock(&p->task.mutex);
if (--tsk->disconnect_ops == 0) {
do_free = true;
tsk_notify = false;
notify = schedule_if_inactive(p);
} else {
// If initiating the close, schedule the task to do the free.
if (tsk_notify)
tsk_notify = schedule(tsk);
if (tsk_notify)
notify_poller(p);
}
unlock(&p->task.mutex);
unlock(tsk_mutex);
// Unsafe to touch tsk after lock release, except if we are the designated final_free
if (do_free) {
if (pc) pconnection_final_free(pc);
else listener_final_free(task_listener(tsk));
}
}
if (notify)
notify_poller(p);
}