in c/tools/reactor-send.c [165:248]
void sender_dispatch(pn_handler_t *h, pn_event_t *event, pn_event_type_t type)
{
sender_context_t *sc = sender_context(h);
switch (type) {
case PN_CONNECTION_INIT:
{
pn_connection_t *conn = pn_event_connection(event);
pn_connection_set_container(conn, sc->container_id);
pn_connection_set_hostname(conn, sc->hostname);
pn_connection_open(conn);
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
pn_link_t *snd = pn_sender(ssn, "sender");
const char *path = pn_url_get_path(sc->send_url);
if (path && strlen(path)) {
pn_terminus_set_address(pn_link_target(snd), path);
pn_terminus_set_address(pn_link_source(snd), path);
}
pn_link_open(snd);
}
break;
case PN_LINK_FLOW:
{
pn_link_t *snd = pn_event_link(event);
while (pn_link_credit(snd) > 0 && sc->sent < sc->opts->msg_count) {
if (sc->sent == 0)
statistics_start(sc->stats);
char tag[8];
void *ptr = &tag;
*((uint64_t *) ptr) = sc->sent;
pn_delivery_t *dlv = pn_delivery(snd, pn_dtag(tag, 8));
// setup the message to send
pn_message_t *msg = get_message(sc, true);;
pn_message_set_address(msg, sc->opts->targets.addresses[0]);
sc->id.u.as_ulong = sc->sent;
pn_message_set_correlation_id(msg, sc->id);
pn_message_set_creation_time(msg, msgr_now());
size_t size = sc->encoded_data_size;
int err = pn_message_encode(msg, sc->encoded_data, &size);
check(err == 0, "message encoding error");
pn_link_send(snd, sc->encoded_data, size);
pn_delivery_settle(dlv);
sc->sent++;
return_message(sc, msg);
}
if (sc->sent == sc->opts->msg_count && !sc->opts->get_replies) {
pn_link_close(snd);
pn_connection_t *conn = pn_event_connection(event);
pn_connection_close(conn);
}
}
break;
case PN_LINK_INIT:
{
pn_link_t *link = pn_event_link(event);
if (pn_link_is_receiver(link)) {
// Response messages link. Could manage credit and deliveries in this handler but
// a dedicated handler also works.
pn_handler_t *replyto = replyto_handler(sc);
pn_flowcontroller_t *fc = pn_flowcontroller(1024);
pn_handler_add(replyto, fc);
pn_decref(fc);
pn_handshaker_t *handshaker = pn_handshaker();
pn_handler_add(replyto, handshaker);
pn_decref(handshaker);
pn_record_t *record = pn_link_attachments(link);
pn_record_set_handler(record, replyto);
pn_decref(replyto);
}
}
break;
case PN_CONNECTION_LOCAL_CLOSE:
{
statistics_report(sc->stats, sc->sent, sc->received);
}
break;
default:
break;
}
}