in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [182:319]
public void connect(final JmsConnectionInfo connectionInfo) throws ProviderException {
checkClosedOrFailed();
if (serializer != null) {
throw new IllegalStateException("Connect cannot be called more than once");
}
final ProviderFuture connectRequest = futureFactory.createFuture();
// Configure Transport prior to initialization at which point configuration is set and
// cannot be updated. All further interaction should take place on the serializer for
// thread safety.
ThreadFactory transportThreadFactory = new QpidJMSThreadFactory(
"AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true);
transport.setThreadFactory(transportThreadFactory);
transport.setTransportListener(AmqpProvider.this);
transport.setMaxFrameSize(maxFrameSize);
final SSLContext sslContextOverride;
if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
sslContextOverride =
(SSLContext) connectionInfo.getExtensionMap().get(
JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
} else {
sslContextOverride = null;
}
if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER)) {
Supplier<?> proxyHandlerSupplier = (Supplier<?>) connectionInfo.getExtensionMap().get(
JmsConnectionExtensions.PROXY_HANDLER_SUPPLIER).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
if (proxyHandlerSupplier != null) {
transport.getTransportOptions().setProxyHandlerSupplier(proxyHandlerSupplier);
}
}
if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
@SuppressWarnings({ "unchecked" })
Map<String, String> headers = (Map<String, String>)
connectionInfo.getExtensionMap().get(
JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
if (headers != null) {
transport.getTransportOptions().getHttpHeaders().putAll(headers);
}
}
try {
serializer = transport.connect(() -> {
this.connectionInfo = connectionInfo;
this.connectionRequest = connectRequest;
protonTransport.setEmitFlowEventOnSend(false);
try {
((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
} catch (NoSuchMethodError nsme) {
// using a version at runtime where the optimisation isn't available, ignore
LOG.trace("Proton output buffer optimisation unavailable");
}
if (getMaxFrameSize() > 0) {
protonTransport.setMaxFrameSize(getMaxFrameSize());
protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
}
protonTransport.setChannelMax(getChannelMax());
protonTransport.setIdleTimeout(idleTimeout);
protonTransport.bind(protonConnection);
protonConnection.collect(protonCollector);
if (saslLayer) {
Sasl sasl = protonTransport.sasl();
sasl.client();
String hostname = getVhost();
if (hostname == null) {
hostname = remoteURI.getHost();
} else if (hostname.isEmpty()) {
hostname = null;
}
sasl.setRemoteHostname(hostname);
sasl.setListener(new SaslListener() {
@Override
public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
authenticator.handleSaslMechanisms(sasl, transport);
checkSaslAuthenticationState();
}
@Override
public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
authenticator.handleSaslChallenge(sasl, transport);
checkSaslAuthenticationState();
}
@Override
public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
authenticator.handleSaslOutcome(sasl, transport);
checkSaslAuthenticationState();
}
@Override
public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
// Server only event
}
@Override
public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
// Server only event
}
});
authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
}
}, sslContextOverride);
// Once connected pump the transport to write the header and respond to any
// data that arrived at connect such as pipelined Header etc
serializer.execute(() -> pumpToProtonTransport());
if (!saslLayer) {
connectRequest.onSuccess();
}
} catch (Throwable t) {
connectRequest.onFailure(ProviderExceptionSupport.createOrPassthroughFatal(t));
}
if (connectionInfo.getConnectTimeout() != JmsConnectionInfo.INFINITE) {
if (!connectRequest.sync(connectionInfo.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
throw new ProviderOperationTimedOutException("Timed out while waiting to connect");
}
} else {
connectRequest.sync();
}
}