public Consumer init()

in artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java [988:1292]


      public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
         Source source = (Source) sender.getRemoteSource();
         final Map<Symbol, Object> supportedFilters = new HashMap<>();

         // Match the settlement mode of the remote instead of relying on the default of MIXED.
         sender.setSenderSettleMode(sender.getRemoteSenderSettleMode());

         // We don't currently support SECOND so enforce that the answer is anlways FIRST
         sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);

         if (source != null) {
            // We look for message selectors on every receiver, while in other cases we might only
            // consume the filter depending on the subscription type.
            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
            if (filter != null) {
               selector = filter.getValue().getDescribed().toString();
               // Validate the Selector.
               try {
                  SelectorParser.parse(selector);
               } catch (FilterException e) {
                  throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
               }

               supportedFilters.put(filter.getKey(), filter.getValue());
            }
         }

         if (source == null) {
            // Attempt to recover a previous subscription happens when a link reattach happens on a
            // subscription queue
            String clientId = getClientId();
            String pubId = sender.getName();
            global = hasRemoteDesiredCapability(sender, GLOBAL);
            shared = hasRemoteDesiredCapability(sender, SHARED);
            queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
            QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
            multicast = true;
            routingTypeToUse = RoutingType.MULTICAST;

            // Once confirmed that the address exists we need to return a Source that reflects
            // the lifetime policy and capabilities of the new subscription.
            if (result.isExists()) {
               source = new org.apache.qpid.proton.amqp.messaging.Source();
               source.setAddress(queue.toString());
               source.setDurable(TerminusDurability.UNSETTLED_STATE);
               source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
               source.setDistributionMode(COPY);
               source.setCapabilities(TOPIC);

               SimpleString filterString = result.getFilterString();
               if (filterString != null) {
                  selector = filterString.toString();
                  boolean noLocal = false;

                  String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
                  String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";

                  if (selector.endsWith(noLocalFilter)) {
                     if (selector.length() > noLocalFilter.length()) {
                        noLocalFilter = " AND " + noLocalFilter;
                        selector = selector.substring(0, selector.length() - noLocalFilter.length());
                     } else {
                        selector = null;
                     }

                     noLocal = true;
                  }

                  if (noLocal) {
                     supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                  }

                  if (selector != null && !selector.trim().isEmpty()) {
                     supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
                  }
               }

               sender.setSource(source);
            } else {
               throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName());
            }
         } else if (source.getDynamic()) {
            // if dynamic we have to create the node (queue) and set the address on the target, the
            // node is temporary and  will be deleted on closing of the session
            queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
            tempQueueName = queue;
            try {
               sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
               // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
            } catch (Exception e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
            }
            source.setAddress(queue.toString());
         } else {
            SimpleString addressToUse;
            SimpleString queueNameToUse = null;
            shared = hasCapabilities(SHARED, source);
            global = hasCapabilities(GLOBAL, source);

            final boolean isFQQN;

            //find out if we have an address made up of the address and queue name, if yes then set queue name
            if (CompositeAddress.isFullyQualified(source.getAddress())) {
               isFQQN = true;
               addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
               queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
            } else {
               isFQQN = false;
               addressToUse = SimpleString.toSimpleString(source.getAddress());
            }
            //check to see if the client has defined how we act
            boolean clientDefined = hasCapabilities(TOPIC, source) || hasCapabilities(QUEUE, source);
            if (clientDefined) {
               multicast = hasCapabilities(TOPIC, source);
               AddressQueryResult addressQueryResult = null;
               try {
                  addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
               } catch (ActiveMQSecurityException e) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
               } catch (ActiveMQAMQPException e) {
                  throw e;
               } catch (Exception e) {
                  throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
               }

               if (!addressQueryResult.isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }

               Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();

               //if the client defines 1 routing type and the broker another then throw an exception
               if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
                  throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
               } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
                  //if client specifies fully qualified name that's allowed, don't throw exception.
                  if (queueNameToUse == null) {
                     throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
                  }
               }
            } else {
               // if not we look up the address
               AddressQueryResult addressQueryResult = null;

               // Set this to the broker configured default for the address prior to the lookup so that
               // an auto create will actually use the configured defaults.  The actual query result will
               // contain the true answer on what routing type the address actually has though.
               final RoutingType routingType = sessionSPI.getDefaultRoutingType(addressToUse);
               routingTypeToUse = routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType;

               try {
                  addressQueryResult = sessionSPI.addressQuery(addressToUse, routingTypeToUse, true);
               } catch (ActiveMQSecurityException e) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
               } catch (ActiveMQAMQPException e) {
                  throw e;
               } catch (Exception e) {
                  throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
               }

               if (!addressQueryResult.isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }

               Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
               if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
                  multicast = true;
               } else {
                  //todo add some checks if both routing types are supported
                  multicast = false;
               }
            }
            routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
            // if not dynamic then we use the target's address as the address to forward the
            // messages to, however there has to be a queue bound to it so we need to check this.
            if (multicast) {
               Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
               if (filter != null) {
                  String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
                  String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                  if (selector != null) {
                     selector += " AND " + noLocalFilter;
                  } else {
                     selector = noLocalFilter;
                  }

                  supportedFilters.put(filter.getKey(), filter.getValue());
               }

               SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
               queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST, simpleStringSelector, isFQQN);

               //if the address specifies a broker configured queue then we always use this, treat it as a queue
               if (queue != null) {
                  multicast = false;
               } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {

                  // if we are a subscription and durable create a durable queue using the container
                  // id and link name
                  String clientId = getClientId();
                  String pubId = sender.getName();
                  queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
                  QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                  if (result.isExists()) {
                     /*
                      * If a client attaches to an existing durable subscription with a different filter or address then
                      * we must recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
                      * configuration then we don't want to change it. We must account for optional address prefixes that
                      * are not carried over into the actual created address by stripping any prefix value that matches
                      * those configured on the acceptor.
                      */
                     if (!result.isConfigurationManaged() &&
                         (!Objects.equals(result.getAddress(), sessionSPI.removePrefix(addressToUse)) ||
                          !Objects.equals(result.getFilterString(), simpleStringSelector))) {

                        if (result.getConsumerCount() == 0) {
                           sessionSPI.deleteQueue(queue);
                           if (shared) {
                              sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                           } else {
                              sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                           }
                        } else {
                           throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                        }
                     }
                  } else {
                     if (shared) {
                        sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     } else {
                        sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     }
                  }
               } else {
                  // otherwise we are a volatile subscription
                  isVolatile = true;
                  if (shared && sender.getName() != null) {
                     queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
                     QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                     if ((!result.isExists() || !Objects.equals(result.getAddress(), addressToUse) || !Objects.equals(result.getFilterString(), simpleStringSelector)) && !result.isConfigurationManaged()) {
                        sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                     }
                  } else {
                     queue = SimpleString.toSimpleString(java.util.UUID.randomUUID().toString());
                     tempQueueName = queue;
                     try {
                        sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                     } catch (Exception e) {
                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                     }
                  }
               }
            } else {
               if (queueNameToUse != null) {
                  SimpleString matchingAnycastQueue;
                  QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, queueNameToUse), null, false, null);
                  if (result.isExists()) {
                     // if the queue exists and we're using FQQN then just ignore the routing-type
                     routingTypeToUse = null;
                  }
                  matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, routingTypeToUse, null, false);
                  if (matchingAnycastQueue != null) {
                     queue = matchingAnycastQueue;
                  } else {
                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
                  }
               } else {
                  SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
                  if (matchingAnycastQueue != null) {
                     queue = matchingAnycastQueue;
                  } else {
                     queue = addressToUse;
                  }
               }

            }

            if (queue == null) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
            }

            try {
               if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }
            } catch (ActiveMQAMQPNotFoundException e) {
               throw e;
            } catch (Exception e) {
               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }
         }

         // Detect if sender is in pre-settle mode.
         preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;

         // We need to update the source with any filters we support otherwise the client
         // is free to consider the attach as having failed if we don't send back what we
         // do support or if we send something we don't support the client won't know we
         // have not honored what it asked for.
         source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);

         boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);

         return (Consumer) sessionSPI.createSender(senderContext, queue, multicast ? null : selector, browseOnly);
      }