in c/src/core/transport.c [272:366]
ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available)
{
const char* error;
bool eos = transport->tail_closed;
if (eos && available==0) {
pn_do_error(transport, "amqp:connection:framing-error", "No protocol header found (connection aborted)");
pn_set_error_layer(transport);
return PN_EOS;
}
pni_protocol_type_t protocol = pni_sniff_header(bytes, available);
PN_LOG(&transport->logger, PN_SUBSYSTEM_IO, PN_LEVEL_DEBUG, "%s detected", pni_protocol_name(protocol));
switch (protocol) {
case PNI_PROTOCOL_SSL:
if (!(transport->allowed_layers & LAYER_SSL)) {
error = "SSL protocol header not allowed (maybe detected twice)";
break;
}
transport->present_layers |= LAYER_SSL;
transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL;
if (!transport->ssl) {
pn_ssl(transport);
}
transport->io_layers[layer] = &ssl_layer;
transport->io_layers[layer+1] = &pni_autodetect_layer;
return ssl_layer.process_input(transport, layer, bytes, available);
case PNI_PROTOCOL_AMQP_SSL:
if (!(transport->allowed_layers & LAYER_AMQPSSL)) {
error = "AMQP SSL protocol header not allowed (maybe detected twice)";
break;
}
transport->present_layers |= LAYER_AMQPSSL;
transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL;
if (!transport->ssl) {
pn_ssl(transport);
}
transport->io_layers[layer] = &ssl_layer;
transport->io_layers[layer+1] = &pni_autodetect_layer;
return 8;
case PNI_PROTOCOL_AMQP_SASL:
if (!(transport->allowed_layers & LAYER_AMQPSASL)) {
error = "AMQP SASL protocol header not allowed (maybe detected twice)";
break;
}
transport->present_layers |= LAYER_AMQPSASL;
transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSSL;
if (!transport->sasl) {
pn_sasl(transport);
}
transport->io_layers[layer] = &sasl_write_header_layer;
transport->io_layers[layer+1] = &pni_autodetect_layer;
PN_LOG(&transport->logger, PN_SUBSYSTEM_SASL, PN_LEVEL_FRAME, " <- %s", "SASL");
pni_sasl_set_external_security(transport, pn_ssl_get_ssf((pn_ssl_t*)transport), pn_ssl_get_remote_subject((pn_ssl_t*)transport));
return 8;
case PNI_PROTOCOL_AMQP1:
if (!(transport->allowed_layers & LAYER_AMQP1)) {
error = "AMQP1.0 protocol header not allowed (maybe detected twice)";
break;
}
transport->present_layers |= LAYER_AMQP1;
transport->allowed_layers = LAYER_NONE;
if (transport->auth_required && !pn_transport_is_authenticated(transport)) {
pn_do_error(transport, "amqp:connection:policy-error",
"Client skipped authentication - forbidden");
pn_set_error_layer(transport);
return 8;
}
if (transport->encryption_required && !pn_transport_is_encrypted(transport)) {
pn_do_error(transport, "amqp:connection:policy-error",
"Client connection unencrypted - forbidden");
pn_set_error_layer(transport);
return 8;
}
transport->io_layers[layer] = &amqp_write_header_layer;
PN_LOG(&transport->logger, PN_SUBSYSTEM_AMQP, PN_LEVEL_FRAME, " <- %s", "AMQP");
return 8;
case PNI_PROTOCOL_INSUFFICIENT:
if (!eos) return 0;
error = "End of input stream before protocol detection";
break;
case PNI_PROTOCOL_AMQP_OTHER:
error = "Incompatible AMQP connection detected";
break;
case PNI_PROTOCOL_UNKNOWN:
default:
error = "Unknown protocol detected";
break;
}
transport->io_layers[layer] = &pni_header_error_layer;
char quoted[1024];
pn_quote_data(quoted, 1024, bytes, available);
pn_do_error(transport, "amqp:connection:framing-error",
"%s: '%s'%s", error, quoted,
!eos ? "" : " (connection aborted)");
return 0;
}