in c/tools/reactor-recv.c [140:248]
void connection_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
{
connection_context_t *cc = connection_context(h);
bool replying = cc->global->opts->reply;
switch (type) {
case PN_LINK_REMOTE_OPEN:
{
pn_link_t *link = pn_event_link(event);
if (pn_link_is_receiver(link)) {
check(cc->recv_link == NULL, "Multiple incoming links on one connection");
cc->recv_link = link;
pn_connection_t *conn = pn_event_connection(event);
if (cc->global->shutting_down) {
pn_connection_close(conn);
break;
}
if (replying) {
// Set up a reply link and defer granting credit to the incoming link
pn_connection_t *conn = pn_session_connection(pn_link_session(link));
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
char name[100]; // prefer a multiplatform uuid generator
sprintf(name, "reply_sender_%d", cc->connection_id);
cc->reply_link = pn_sender(ssn, name);
pn_link_open(cc->reply_link);
}
else {
pn_flowcontroller_t *fc = pn_flowcontroller(1024);
pn_handler_add(h, fc);
pn_decref(fc);
}
}
}
break;
case PN_LINK_FLOW:
{
if (replying) {
pn_link_t *reply_link = pn_event_link(event);
// pn_flowcontroller handles the non-reply case
check(reply_link == cc->reply_link, "internal error");
// Grant the sender as much credit as just given to us for replies
int delta = pn_link_credit(reply_link) - pn_link_credit(cc->recv_link);
if (delta > 0)
pn_link_flow(cc->recv_link, delta);
}
}
break;
case PN_DELIVERY:
{
pn_link_t *recv_link = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_link_is_receiver(recv_link) && !pn_delivery_partial(dlv)) {
if (cc->global->received == 0) statistics_start(cc->global->stats);
size_t encoded_size = pn_delivery_pending(dlv);
cc->global->encoded_data = ensure_buffer(cc->global->encoded_data, encoded_size,
&cc->global->encoded_data_size);
check(cc->global->encoded_data, "decoding buffer realloc failure");
ssize_t n = pn_link_recv(recv_link, cc->global->encoded_data, encoded_size);
check(n == (ssize_t) encoded_size, "message data read fail");
pn_message_t *msg = cc->global->message;
int err = pn_message_decode(msg, cc->global->encoded_data, n);
check(err == 0, "message decode error");
cc->global->received++;
pn_delivery_settle(dlv);
statistics_msg_received(cc->global->stats, msg);
if (replying) {
const char *reply_addr = pn_message_get_reply_to(msg);
if (reply_addr) {
pn_link_t *rl = cc->reply_link;
check(pn_link_credit(rl) > 0, "message received without corresponding reply credit");
LOG("Replying to: %s\n", reply_addr );
pn_message_set_address(msg, reply_addr);
pn_message_set_creation_time(msg, msgr_now());
char tag[8];
void *ptr = &tag;
*((uint64_t *) ptr) = cc->global->sent;
pn_delivery_t *dlv = pn_delivery(rl, pn_dtag(tag, 8));
size_t size = cc->global->encoded_data_size;
int err = pn_message_encode(msg, cc->global->encoded_data, &size);
check(err == 0, "message encoding error");
pn_link_send(rl, cc->global->encoded_data, size);
pn_delivery_settle(dlv);
cc->global->sent++;
}
}
}
if (cc->global->received >= cc->global->opts->msg_count) {
global_shutdown(cc->global);
}
}
break;
case PN_CONNECTION_UNBOUND:
{
pn_connection_t *conn = pn_event_connection(event);
pn_connection_release(conn);
}
break;
default:
break;
}
}