int pn_messenger_start()

in c/src/messenger/messenger.c [1490:1564]


int pn_messenger_start(pn_messenger_t *messenger)
{
  if (!messenger) return PN_ARG_ERR;

  int error = 0;

  // When checking of routes is required we attempt to resolve each route
  // with a substitution that has a defined scheme, address and port. If
  // any of theses routes is invalid an appropriate error code will be
  // returned. Currently no attempt is made to check the name part of the
  // address, as the intent here is to fail fast if the addressed host
  // is invalid or unavailable.
  if (messenger->flags & PN_FLAGS_CHECK_ROUTES) {
    pn_list_t *substitutions = pn_list(PN_WEAKREF, 0);
    pn_transform_get_substitutions(messenger->routes, substitutions);
    for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) {
      pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i);
      if (substitution) {
        pn_address_t addr;
        addr.text = pn_string(NULL);
        error = pn_string_copy(addr.text, substitution);
        if (!error) {
          pni_parse(&addr);
          if (addr.scheme && strlen(addr.scheme) > 0 &&
              !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 &&
              !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 &&
              !strstr(addr.port, "$")) {
            pn_string_t *check_addr = pn_string(NULL);
            // ipv6 hosts need to be wrapped in [] within a URI
            if (strstr(addr.host, ":")) {
              pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme,
                               addr.host, addr.port);
            } else {
              pn_string_format(check_addr, "%s://%s:%s/", addr.scheme,
                               addr.host, addr.port);
            }
            char *name = NULL;
            pn_connection_t *connection = pn_messenger_resolve(
                messenger, pn_string_get(check_addr), &name);
            pn_free(check_addr);
            if (!connection) {
              if (pn_error_code(messenger->error) == 0)
                pn_error_copy(messenger->error, pn_io_error(messenger->io));
              pn_error_format(messenger->error, PN_ERR,
                              "CONNECTION ERROR (%s:%s): %s\n",
                              messenger->address.host, messenger->address.port,
                              pn_error_text(messenger->error));
              error = pn_error_code(messenger->error);
            } else {
              // Send and receive outstanding messages until connection
              // completes or an error occurs
              int work = pn_messenger_work(messenger, -1);
              pn_connection_ctx_t *cctx =
                  (pn_connection_ctx_t *)pn_connection_get_context(connection);
              while ((work > 0 ||
                      (pn_connection_state(connection) & PN_REMOTE_UNINIT) ||
                      pni_connection_pending(cctx->selectable) != (ssize_t)0) &&
                     pn_error_code(messenger->error) == 0)
                work = pn_messenger_work(messenger, 0);
              if (work < 0 && work != PN_TIMEOUT) {
                error = work;
              } else {
                error = pn_error_code(messenger->error);
              }
            }
          }
          pn_free(addr.text);
        }
      }
    }
    pn_free(substitutions);
  }

  return error;
}