ssize_t pn_io_layer_input_autodetect()

in c/src/core/transport.c [272:366]


ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available)
{
  const char* error;
  bool eos = transport->tail_closed;
  if (eos && available==0) {
    pn_do_error(transport, "amqp:connection:framing-error", "No protocol header found (connection aborted)");
    pn_set_error_layer(transport);
    return PN_EOS;
  }
  pni_protocol_type_t protocol = pni_sniff_header(bytes, available);
  PN_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_DEBUG, "%s detected", pni_protocol_name(protocol));
  switch (protocol) {
  case PNI_PROTOCOL_SSL:
    if (!(transport->allowed_layers & LAYER_SSL)) {
      error = "SSL protocol header not allowed (maybe detected twice)";
      break;
    }
    transport->present_layers |= LAYER_SSL;
    transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL;
    if (!transport->ssl) {
      pn_ssl(transport);
    }
    transport->io_layers[layer] = &ssl_layer;
    transport->io_layers[layer+1] = &pni_autodetect_layer;
    return ssl_layer.process_input(transport, layer, bytes, available);
  case PNI_PROTOCOL_AMQP_SSL:
    if (!(transport->allowed_layers & LAYER_AMQPSSL)) {
      error = "AMQP SSL protocol header not allowed (maybe detected twice)";
      break;
    }
    transport->present_layers |= LAYER_AMQPSSL;
    transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL;
    if (!transport->ssl) {
      pn_ssl(transport);
    }
    transport->io_layers[layer] = &ssl_layer;
    transport->io_layers[layer+1] = &pni_autodetect_layer;
    return 8;
  case PNI_PROTOCOL_AMQP_SASL:
    if (!(transport->allowed_layers & LAYER_AMQPSASL)) {
      error = "AMQP SASL protocol header not allowed (maybe detected twice)";
      break;
    }
    transport->present_layers |= LAYER_AMQPSASL;
    transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSSL;
    if (!transport->sasl) {
      pn_sasl(transport);
    }
    transport->io_layers[layer] = &sasl_write_header_layer;
    transport->io_layers[layer+1] = &pni_autodetect_layer;
    PN_LOG(&transport->logger, PN_SUBSYSTEM_SASL, PN_LEVEL_FRAME, "  <- %s", "SASL");
    pni_sasl_set_external_security(transport, pn_ssl_get_ssf((pn_ssl_t*)transport), pn_ssl_get_remote_subject((pn_ssl_t*)transport));
    return 8;
  case PNI_PROTOCOL_AMQP1:
    if (!(transport->allowed_layers & LAYER_AMQP1)) {
      error = "AMQP1.0 protocol header not allowed (maybe detected twice)";
      break;
    }
    transport->present_layers |= LAYER_AMQP1;
    transport->allowed_layers = LAYER_NONE;
    if (transport->auth_required && !pn_transport_is_authenticated(transport)) {
      pn_do_error(transport, "amqp:connection:policy-error",
                  "Client skipped authentication - forbidden");
      pn_set_error_layer(transport);
      return 8;
    }
    if (transport->encryption_required && !pn_transport_is_encrypted(transport)) {
      pn_do_error(transport, "amqp:connection:policy-error",
                  "Client connection unencrypted - forbidden");
      pn_set_error_layer(transport);
      return 8;
    }
    transport->io_layers[layer] = &amqp_write_header_layer;
    PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, "  <- %s", "AMQP");
    return 8;
  case PNI_PROTOCOL_INSUFFICIENT:
    if (!eos) return 0;
    error = "End of input stream before protocol detection";
    break;
  case PNI_PROTOCOL_AMQP_OTHER:
    error = "Incompatible AMQP connection detected";
    break;
  case PNI_PROTOCOL_UNKNOWN:
  default:
    error = "Unknown protocol detected";
    break;
  }
  transport->io_layers[layer] = &pni_header_error_layer;
  char quoted[1024];
  pn_quote_data(quoted, 1024, bytes, available);
  pn_do_error(transport, "amqp:connection:framing-error",
              "%s: '%s'%s", error, quoted,
              !eos ? "" : " (connection aborted)");
  return 0;
}