in c/src/messenger/messenger.c [1490:1564]
int pn_messenger_start(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
int error = 0;
// When checking of routes is required we attempt to resolve each route
// with a substitution that has a defined scheme, address and port. If
// any of theses routes is invalid an appropriate error code will be
// returned. Currently no attempt is made to check the name part of the
// address, as the intent here is to fail fast if the addressed host
// is invalid or unavailable.
if (messenger->flags & PN_FLAGS_CHECK_ROUTES) {
pn_list_t *substitutions = pn_list(PN_WEAKREF, 0);
pn_transform_get_substitutions(messenger->routes, substitutions);
for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) {
pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i);
if (substitution) {
pn_address_t addr;
addr.text = pn_string(NULL);
error = pn_string_copy(addr.text, substitution);
if (!error) {
pni_parse(&addr);
if (addr.scheme && strlen(addr.scheme) > 0 &&
!strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 &&
!strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 &&
!strstr(addr.port, "$")) {
pn_string_t *check_addr = pn_string(NULL);
// ipv6 hosts need to be wrapped in [] within a URI
if (strstr(addr.host, ":")) {
pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme,
addr.host, addr.port);
} else {
pn_string_format(check_addr, "%s://%s:%s/", addr.scheme,
addr.host, addr.port);
}
char *name = NULL;
pn_connection_t *connection = pn_messenger_resolve(
messenger, pn_string_get(check_addr), &name);
pn_free(check_addr);
if (!connection) {
if (pn_error_code(messenger->error) == 0)
pn_error_copy(messenger->error, pn_io_error(messenger->io));
pn_error_format(messenger->error, PN_ERR,
"CONNECTION ERROR (%s:%s): %s\n",
messenger->address.host, messenger->address.port,
pn_error_text(messenger->error));
error = pn_error_code(messenger->error);
} else {
// Send and receive outstanding messages until connection
// completes or an error occurs
int work = pn_messenger_work(messenger, -1);
pn_connection_ctx_t *cctx =
(pn_connection_ctx_t *)pn_connection_get_context(connection);
while ((work > 0 ||
(pn_connection_state(connection) & PN_REMOTE_UNINIT) ||
pni_connection_pending(cctx->selectable) != (ssize_t)0) &&
pn_error_code(messenger->error) == 0)
work = pn_messenger_work(messenger, 0);
if (work < 0 && work != PN_TIMEOUT) {
error = work;
} else {
error = pn_error_code(messenger->error);
}
}
}
pn_free(addr.text);
}
}
}
pn_free(substitutions);
}
return error;
}