in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [412:498]
public void create(final JmsResource resource, final AsyncResult request) throws ProviderException {
checkClosedOrFailed();
checkConnected();
serializer.execute(() -> {
try {
checkClosedOrFailed();
resource.visit(new JmsResourceVistor() {
@Override
public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
connection.createSession(sessionInfo, request);
}
@Override
public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
AmqpSession session = connection.getSession(producerInfo.getParentId());
session.createProducer(producerInfo, request);
}
@Override
public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
final AmqpSession session;
if (consumerInfo.isConnectionConsumer()) {
session = connection.getConnectionSession();
} else {
session = connection.getSession(consumerInfo.getParentId());
}
session.createConsumer(consumerInfo, request);
}
@Override
public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
AmqpProvider.this.connectionInfo = connectionInfo;
AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
connectionRequest = new AsyncResult() {
AtomicBoolean signalled = new AtomicBoolean();
@Override
public void onSuccess() {
if (signalled.compareAndSet(false, true)) {
fireConnectionEstablished();
request.onSuccess();
}
}
@Override
public void onFailure(ProviderException result) {
if (signalled.compareAndSet(false, true)) {
request.onFailure(result);
}
}
@Override
public boolean isComplete() {
return request.isComplete();
}
};
builder.buildResource(connectionRequest);
}
@Override
public void processDestination(JmsTemporaryDestination destination) throws Exception {
if (destination.isTemporary()) {
connection.createTemporaryDestination(destination, request);
} else {
request.onSuccess();
}
}
@Override
public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
AmqpSession session = connection.getSession(transactionInfo.getSessionId());
session.begin(transactionInfo.getId(), request);
}
});
pumpToProtonTransport(request);
} catch (Throwable t) {
request.onFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(t));
}
});
}