in c/tools/msgr-send.c [186:322]
int main(int argc, char** argv)
{
Options_t opts;
Statistics_t stats;
uint64_t sent = 0;
uint64_t received = 0;
int target_index = 0;
int rc;
pn_message_t *message = 0;
pn_message_t *reply_message = 0;
pn_messenger_t *messenger = 0;
parse_options( argc, argv, &opts );
messenger = pn_messenger( opts.name );
if (opts.certificate) {
rc = pn_messenger_set_certificate(messenger, opts.certificate);
check( rc == 0, "Failed to set certificate" );
}
if (opts.privatekey) {
rc = pn_messenger_set_private_key(messenger, opts.privatekey);
check( rc == 0, "Failed to set private key" );
}
if (opts.password) {
rc = pn_messenger_set_password(messenger, opts.password);
free(opts.password);
check( rc == 0, "Failed to set password" );
}
if (opts.ca_db) {
rc = pn_messenger_set_trusted_certificates(messenger, opts.ca_db);
check( rc == 0, "Failed to set trusted CA database" );
}
if (opts.outgoing_window) {
pn_messenger_set_outgoing_window( messenger, opts.outgoing_window );
}
pn_messenger_set_timeout( messenger, opts.timeout );
pn_messenger_start(messenger);
message = pn_message();
check(message, "failed to allocate a message");
pn_message_set_reply_to(message, "~");
pn_data_t *body = pn_message_body(message);
char *data = (char *)calloc(1, opts.msg_size);
pn_data_put_binary(body, pn_bytes(opts.msg_size, data));
free(data);
pn_atom_t id;
id.type = PN_ULONG;
#if 0
// TODO: how do we effectively benchmark header processing overhead???
pn_data_t *props = pn_message_properties(message);
pn_data_put_map(props);
pn_data_enter(props);
//
//pn_data_put_string(props, pn_bytes(6, "string"));
//pn_data_put_string(props, pn_bytes(10, "this is awkward"));
//
//pn_data_put_string(props, pn_bytes(4, "long"));
pn_data_put_long(props, 12345);
//
//pn_data_put_string(props, pn_bytes(9, "timestamp"));
pn_data_put_timestamp(props, (pn_timestamp_t) 54321);
pn_data_exit(props);
#endif
const int get_replies = opts.get_replies;
if (get_replies) {
// disable the timeout so that pn_messenger_recv() won't block
reply_message = pn_message();
check(reply_message, "failed to allocate a message");
}
statistics_start( &stats );
while (!opts.msg_count || (sent < opts.msg_count)) {
// setup the message to send
pn_message_set_address(message, opts.targets.addresses[target_index]);
target_index = NEXT_ADDRESS(opts.targets, target_index);
id.u.as_ulong = sent;
pn_message_set_correlation_id( message, id );
pn_message_set_creation_time( message, msgr_now() );
pn_messenger_put(messenger, message);
sent++;
if (opts.send_batch && (pn_messenger_outgoing(messenger) >= (int)opts.send_batch)) {
if (get_replies) {
while (received < sent) {
// this will also transmit any pending sent messages
received += process_replies( messenger, reply_message,
&stats, opts.recv_count );
}
} else {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check((rc == 0 || rc == PN_TIMEOUT), "pn_messenger_send() failed");
}
}
check_messenger(messenger);
}
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
if (get_replies) {
// wait for the last of the replies
while (received < sent) {
int count = process_replies( messenger, reply_message,
&stats, opts.recv_count );
check( count > 0 || (opts.timeout == 0),
"Error: timed out waiting for reply messages\n");
received += count;
LOG("Messages received=%" PRIu64 " sent=%" PRIu64 "\n", received, sent);
}
} else if (pn_messenger_outgoing(messenger) > 0) {
LOG("Calling pn_messenger_send()\n");
rc = pn_messenger_send(messenger, -1);
check(rc == 0, "pn_messenger_send() failed");
}
rc = pn_messenger_stop(messenger);
check(rc == 0, "pn_messenger_stop() failed");
check_messenger(messenger);
statistics_report( &stats, sent, received );
pn_messenger_free(messenger);
pn_message_free(message);
if (reply_message) pn_message_free( reply_message );
addresses_free( &opts.targets );
return 0;
}