public void createSender()

in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java [258:412]


    public void createSender(final Sender protonSender) throws Exception {
        org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) protonSender.getRemoteSource();

        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);

        LOG.debug("opening new sender {} on link: {}", consumerInfo.getConsumerId(), protonSender.getName());

        try {
            final Map<Symbol, Object> supportedFilters = new HashMap<>();
            protonSender.setContext(sender);

            boolean noLocal = false;
            String selector = null;

            if (source != null) {
                Map.Entry<Symbol, DescribedType> filter = findFilter(source.getFilter(), JMS_SELECTOR_FILTER_IDS);
                if (filter != null) {
                    selector = filter.getValue().getDescribed().toString();
                    // Validate the Selector.
                    try {
                        SelectorParser.parse(selector);
                    } catch (InvalidSelectorException e) {
                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
                        return;
                    }

                    supportedFilters.put(filter.getKey(), filter.getValue());
                }

                filter = findFilter(source.getFilter(), NO_LOCAL_FILTER_IDS);
                if (filter != null) {
                    noLocal = true;
                    supportedFilters.put(filter.getKey(), filter.getValue());
                }
            }

            ActiveMQDestination destination;
            if (source == null) {
                // Attempt to recover previous subscription
                ConsumerInfo storedInfo = connection.lookupSubscription(protonSender.getName());

                if (storedInfo != null) {
                    destination = storedInfo.getDestination();

                    source = new org.apache.qpid.proton.amqp.messaging.Source();
                    source.setAddress(destination.getQualifiedName());
                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
                    source.setDistributionMode(COPY);

                    if (storedInfo.isNoLocal()) {
                        supportedFilters.put(NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                    }

                    if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
                        supportedFilters.put(JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
                    }
                } else {
                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
                    return;
                }
            } else if (source.getDynamic()) {
                destination = connection.createTemporaryDestination(protonSender, source.getCapabilities());

                Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
                dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());

                // Currently we only support temporary destinations with delete on close lifetime policy.
                source = new org.apache.qpid.proton.amqp.messaging.Source();
                source.setAddress(destination.getQualifiedName());
                source.setCapabilities(AmqpSupport.getDestinationTypeSymbol(destination));
                source.setDynamic(true);
                source.setDynamicNodeProperties(dynamicNodeProperties);

                sender.addCloseAction(new Runnable() {

                    @Override
                    public void run() {
                        connection.deleteTemporaryDestination((ActiveMQTempDestination) sender.getDestination());
                    }
                });
            } else {
                destination = createDestination(source);
                if (destination.isTemporary()) {
                    String connectionId = ((ActiveMQTempDestination) destination).getConnectionId();
                    if (connectionId == null) {
                        throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
                    }
                }
            }

            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
            protonSender.setSource(source);

            int senderCredit = protonSender.getRemoteCredit();

            // Allows the options on the destination to configure the consumerInfo
            if (destination.getOptions() != null) {
                Map<String, Object> options = IntrospectionSupport.extractProperties(
                    new HashMap<String, Object>(destination.getOptions()), "consumer.");
                IntrospectionSupport.setProperties(consumerInfo, options);
                if (options.size() > 0) {
                    String msg = "There are " + options.size()
                        + " consumer options that couldn't be set on the consumer."
                        + " Check the options are spelled correctly."
                        + " Unknown parameters=[" + options + "]."
                        + " This consumer cannot be started.";
                    LOG.warn(msg);
                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
                }
            }

            consumerInfo.setSelector(selector);
            consumerInfo.setNoRangeAcks(true);
            consumerInfo.setDestination(destination);
            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
            consumerInfo.setDispatchAsync(true);
            consumerInfo.setNoLocal(noLocal);

            if (source.getDistributionMode() == COPY && destination.isQueue()) {
                consumerInfo.setBrowser(true);
            }

            if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
                consumerInfo.setSubscriptionName(protonSender.getName());
            }

            connection.sendToActiveMQ(consumerInfo, new ResponseHandler() {
                @Override
                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                    if (response.isException()) {
                        ErrorCondition error = null;
                        Throwable exception = ((ExceptionResponse) response).getException();
                        if (exception instanceof SecurityException) {
                            error = new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage());
                        } else if (exception instanceof InvalidSelectorException) {
                            error = new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage());
                        } else {
                            error = new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
                        }

                        sender.close(error);
                    } else {
                        sender.open();
                    }
                    pumpProtonToSocket();
                }
            });

        } catch (AmqpProtocolException e) {
            sender.close(new ErrorCondition(Symbol.getSymbol(e.getSymbolicName()), e.getMessage()));
        }
    }