in cpp/src/messaging_adapter.cpp [113:188]
void on_delivery(messaging_handler& handler, pn_event_t* event) {
pn_link_t *lnk = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
link_context& lctx = link_context::get(lnk);
Tracing& ot = Tracing::getTracing();
if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (pn_delivery_aborted(dlv)) {
pn_delivery_settle(dlv);
pn_link_flow(lnk, 1);
pn_link_advance(lnk);
}
else if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate on_message
pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
connection_context& ctx = connection_context::get(pnc);
// Reusable per-connection message.
// Avoid expensive heap malloc/free overhead.
// See PROTON-998
class message &msg(ctx.event_message);
message_decode(msg, d);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (lctx.auto_accept)
d.release();
} else {
ot.on_message_handler(handler, d, msg);
if (lctx.auto_accept && pn_delivery_local_state(dlv) == 0) // Not set by handler
d.accept();
if (lctx.draining && !pn_link_credit(lnk)) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
}
}
}
else if (pn_delivery_updated(dlv) && d.settled()) {
handler.on_delivery_settle(d);
}
if (lctx.draining && pn_link_credit(lnk) == 0) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
handler.on_receiver_drain_finish(r);
if (lctx.pending_credit) {
pn_link_flow(lnk, lctx.pending_credit);
lctx.pending_credit = 0;
}
}
credit_topup(lnk);
} else {
// sender
if (pn_delivery_updated(dlv)) {
tracker t(make_wrapper<tracker>(dlv));
ot.on_settled_span(t);
switch(pn_delivery_remote_state(dlv)) {
case PN_ACCEPTED:
handler.on_tracker_accept(t);
break;
case PN_REJECTED:
handler.on_tracker_reject(t);
break;
case PN_RELEASED:
case PN_MODIFIED:
handler.on_tracker_release(t);
break;
}
if (t.settled()) {
handler.on_tracker_settle(t);
if (lctx.auto_settle)
t.settle();
}
}
}
}