void on_delivery()

in cpp/src/messaging_adapter.cpp [113:188]


void on_delivery(messaging_handler& handler, pn_event_t* event) {
    pn_link_t *lnk = pn_event_link(event);
    pn_delivery_t *dlv = pn_event_delivery(event);
    link_context& lctx = link_context::get(lnk);
    Tracing& ot = Tracing::getTracing();

    if (pn_link_is_receiver(lnk)) {
        delivery d(make_wrapper<delivery>(dlv));
        if (pn_delivery_aborted(dlv)) {
            pn_delivery_settle(dlv);
            pn_link_flow(lnk, 1);
            pn_link_advance(lnk);
        }
        else if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
            // generate on_message
            pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
            connection_context& ctx = connection_context::get(pnc);
            // Reusable per-connection message.
            // Avoid expensive heap malloc/free overhead.
            // See PROTON-998
            class message &msg(ctx.event_message);
            message_decode(msg, d);
            if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
                if (lctx.auto_accept)
                    d.release();
            } else {
                ot.on_message_handler(handler, d, msg);
                if (lctx.auto_accept && pn_delivery_local_state(dlv) == 0) // Not set by handler
                    d.accept();
                if (lctx.draining && !pn_link_credit(lnk)) {
                    lctx.draining = false;
                    pn_link_set_drain(lnk, false);
                    receiver r(make_wrapper<receiver>(lnk));
                    handler.on_receiver_drain_finish(r);
                }
            }
        }
        else if (pn_delivery_updated(dlv) && d.settled()) {
            handler.on_delivery_settle(d);
        }
        if (lctx.draining && pn_link_credit(lnk) == 0) {
            lctx.draining = false;
            pn_link_set_drain(lnk, false);
            receiver r(make_wrapper<receiver>(lnk));
            handler.on_receiver_drain_finish(r);
            if (lctx.pending_credit) {
                pn_link_flow(lnk, lctx.pending_credit);
                lctx.pending_credit = 0;
            }
        }
        credit_topup(lnk);
    } else {
        // sender
        if (pn_delivery_updated(dlv)) {
            tracker t(make_wrapper<tracker>(dlv));
            ot.on_settled_span(t);
            switch(pn_delivery_remote_state(dlv)) {
            case PN_ACCEPTED:
                handler.on_tracker_accept(t);
                break;
            case PN_REJECTED:
                handler.on_tracker_reject(t);
                break;
            case PN_RELEASED:
            case PN_MODIFIED:
                handler.on_tracker_release(t);
                break;
            }
            if (t.settled()) {
                handler.on_tracker_settle(t);
                if (lctx.auto_settle)
                    t.settle();
            }
        }
    }
}