public void create()

in qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java [412:498]


    public void create(final JmsResource resource, final AsyncResult request) throws ProviderException {
        checkClosedOrFailed();
        checkConnected();

        serializer.execute(() -> {

            try {
                checkClosedOrFailed();
                resource.visit(new JmsResourceVistor() {
                    @Override
                    public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
                        connection.createSession(sessionInfo, request);
                    }

                    @Override
                    public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
                        AmqpSession session = connection.getSession(producerInfo.getParentId());
                        session.createProducer(producerInfo, request);
                    }

                    @Override
                    public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
                        final AmqpSession session;

                        if (consumerInfo.isConnectionConsumer()) {
                            session = connection.getConnectionSession();
                        } else {
                            session = connection.getSession(consumerInfo.getParentId());
                        }

                        session.createConsumer(consumerInfo, request);
                    }

                    @Override
                    public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
                        AmqpProvider.this.connectionInfo = connectionInfo;

                        AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
                        connectionRequest = new AsyncResult() {
                            AtomicBoolean signalled = new AtomicBoolean();

                            @Override
                            public void onSuccess() {
                                if (signalled.compareAndSet(false, true)) {
                                    fireConnectionEstablished();
                                    request.onSuccess();
                                }
                            }

                            @Override
                            public void onFailure(ProviderException result) {
                                if (signalled.compareAndSet(false, true)) {
                                    request.onFailure(result);
                                }
                            }

                            @Override
                            public boolean isComplete() {
                                return request.isComplete();
                            }
                        };

                        builder.buildResource(connectionRequest);
                    }

                    @Override
                    public void processDestination(JmsTemporaryDestination destination) throws Exception {
                        if (destination.isTemporary()) {
                            connection.createTemporaryDestination(destination, request);
                        } else {
                            request.onSuccess();
                        }
                    }

                    @Override
                    public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
                        AmqpSession session = connection.getSession(transactionInfo.getSessionId());
                        session.begin(transactionInfo.getId(), request);
                    }
                });

                pumpToProtonTransport(request);
            } catch (Throwable t) {
                request.onFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(t));
            }
        });
    }