void pn_proactor_disconnect()

in c/src/proactor/epoll.c [2845:2937]


void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
  bool notify = false;

  lock(&p->task.mutex);
  // Move the whole tasks list into a disconnecting state
  task_t *disconnecting_tasks = p->tasks;
  p->tasks = NULL;
  // First pass: mark each task as disconnecting and update global pending count.
  task_t *tsk = disconnecting_tasks;
  while (tsk) {
    tsk->disconnecting = true;
    tsk->disconnect_ops = 2;   // Second pass below and proactor_remove(), in any order.
    p->disconnects_pending++;
    tsk = tsk->next;
    p->task_count--;
  }
  notify = schedule_if_inactive(p);
  unlock(&p->task.mutex);
  if (!disconnecting_tasks) {
    if (notify) notify_poller(p);
    return;
  }

  // Second pass: different locking, close the tasks, free them if !disconnect_ops
  task_t *next = disconnecting_tasks;
  while (next) {
    tsk = next;
    next = tsk->next;           /* Save next pointer in case we free tsk */
    bool do_free = false;
    bool tsk_notify = false;
    pmutex *tsk_mutex = NULL;
    // TODO: Need to extend this for raw connections too
    pconnection_t *pc = task_pconnection(tsk);
    if (pc) {
      tsk_mutex = &pc->task.mutex;
      lock(tsk_mutex);
      if (!tsk->closing) {
        tsk_notify = true;
        if (tsk->working) {
          // Must defer
          pc->queued_disconnect = true;
          if (cond) {
            if (!pc->disconnect_condition)
              pc->disconnect_condition = pn_condition();
            pn_condition_copy(pc->disconnect_condition, cond);
          }
        }
        else {
          // No conflicting working task.
          if (cond) {
            pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
          }
          pn_connection_driver_close(&pc->driver);
        }
      }
    } else {
      pn_listener_t *l = task_listener(tsk);
      assert(l);
      tsk_mutex = &l->task.mutex;
      lock(tsk_mutex);
      if (!tsk->closing) {
        tsk_notify = true;
        if (cond) {
          pn_condition_copy(pn_listener_condition(l), cond);
        }
        listener_begin_close(l);
      }
    }

    lock(&p->task.mutex);
    if (--tsk->disconnect_ops == 0) {
      do_free = true;
      tsk_notify = false;
      notify = schedule_if_inactive(p);
    } else {
      // If initiating the close, schedule the task to do the free.
      if (tsk_notify)
        tsk_notify = schedule(tsk);
      if (tsk_notify)
        notify_poller(p);
    }
    unlock(&p->task.mutex);
    unlock(tsk_mutex);

    // Unsafe to touch tsk after lock release, except if we are the designated final_free
    if (do_free) {
      if (pc) pconnection_final_free(pc);
      else listener_final_free(task_listener(tsk));
    }
  }
  if (notify)
    notify_poller(p);
}