in c/tools/msgr-recv.c [150:284]
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
uint64_t sent = 0;
uint64_t received = 0;
int forwarding_index = 0;
int rc;
pn_message_t *message;
pn_messenger_t *messenger;
parse_options( argc, argv, &opts );
const int forward = opts.forwarding_targets.count != 0;
message = pn_message();
messenger = pn_messenger( opts.name );
/* load the various command line options if they're set */
if (opts.certificate) {
rc = pn_messenger_set_certificate(messenger, opts.certificate);
check_messenger(messenger);
check( rc == 0, "Failed to set certificate" );
}
if (opts.privatekey) {
rc = pn_messenger_set_private_key(messenger, opts.privatekey);
check_messenger(messenger);
check( rc == 0, "Failed to set private key" );
}
if (opts.password) {
rc = pn_messenger_set_password(messenger, opts.password);
free(opts.password);
check_messenger(messenger);
check( rc == 0, "Failed to set password" );
}
if (opts.ca_db) {
rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
check_messenger(messenger);
check( rc == 0, "Failed to set trusted CA database" );
}
if (opts.incoming_window) {
// RAFI: seems to cause receiver to hang:
pn_messenger_set_incoming_window( messenger, opts.incoming_window );
}
pn_messenger_set_timeout( messenger, opts.timeout );
pn_messenger_start(messenger);
check_messenger(messenger);
int i;
for (i = 0; i < opts.subscriptions.count; i++) {
pn_messenger_subscribe(messenger, opts.subscriptions.addresses[i]);
check_messenger(messenger);
LOG("Subscribing to '%s'\n", opts.subscriptions.addresses[i]);
}
// hack to let test scripts know when the receivers are ready (so
// that the senders may be started)
if (opts.ready_text) {
fprintf(stdout, "%s\n", opts.ready_text);
fflush(stdout);
}
while (!opts.msg_count || received < opts.msg_count) {
LOG("Calling pn_messenger_recv(%d)\n", opts.recv_count);
rc = pn_messenger_recv(messenger, opts.recv_count);
check_messenger(messenger);
check(rc == 0 || (opts.timeout == 0 && rc == PN_TIMEOUT), "pn_messenger_recv() failed");
// start the timer only after receiving the first msg
if (received == 0) statistics_start( &stats );
LOG("Messages on incoming queue: %d\n", pn_messenger_incoming(messenger));
while (pn_messenger_incoming(messenger)) {
pn_messenger_get(messenger, message);
check_messenger(messenger);
received++;
// TODO: header decoding?
// uint64_t id = pn_message_get_correlation_id( message ).u.as_ulong;
statistics_msg_received( &stats, message );
if (opts.reply) {
const char *reply_addr = pn_message_get_reply_to( message );
if (reply_addr) {
LOG("Replying to: %s\n", reply_addr );
pn_message_set_address( message, reply_addr );
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
}
}
if (forward) {
const char *forward_addr = opts.forwarding_targets.addresses[forwarding_index];
forwarding_index = NEXT_ADDRESS(opts.forwarding_targets, forwarding_index);
LOG("Forwarding to: %s\n", forward_addr );
pn_message_set_address( message, forward_addr );
pn_message_set_reply_to( message, NULL ); // else points to origin sender
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
}
}
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
}
// this will flush any pending sends
if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check_messenger(messenger);
check(rc == 0, "pn_messenger_send() failed");
}
rc = pn_messenger_stop(messenger);
check(rc == 0, "pn_messenger_stop() failed");
check_messenger(messenger);
statistics_report( &stats, sent, received );
pn_messenger_free(messenger);
pn_message_free(message);
addresses_free( &opts.subscriptions );
addresses_free( &opts.forwarding_targets );
return 0;
}