public Consumer init()

in artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java [122:450]


   public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
      validateConnectionState();

      this.standardMessageWriter = new AMQPMessageWriter(senderContext);
      this.largeMessageWriter = new AMQPLargeMessageWriter(senderContext);

      Map<String, String> addressParameters = Collections.emptyMap();
      Source source = (Source) protonSender.getRemoteSource();
      final Map<Symbol, Object> supportedFilters = new HashMap<>();

      // Match the settlement mode of the remote instead of relying on the default of MIXED.
      protonSender.setSenderSettleMode(protonSender.getRemoteSenderSettleMode());

      // We don't currently support SECOND so enforce that the answer is always FIRST
      protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);

      // If the remote receiver says it can accept tunneled core then that's what we will send them
      if (verifyOfferedCapabilities(protonSender, CORE_MESSAGE_TUNNELING_SUPPORT)) {
         protonSender.setDesiredCapabilities(new Symbol[] {CORE_MESSAGE_TUNNELING_SUPPORT});
         coreTunnelingEnabled = true;
         coreMessageWriter = new AMQPTunneledCoreMessageWriter(senderContext);
         coreLargeMessageWriter = new AMQPTunneledCoreLargeMessageWriter(senderContext);
      }

      if (source != null) {
         // We look for message selectors on every receiver, while in other cases we might only
         // consume the filter depending on the subscription type.
         Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
         if (filter != null) {
            selector = filter.getValue().getDescribed().toString();
            // Validate the Selector.
            try {
               SelectorParser.parse(selector);
            } catch (FilterException e) {
               throw new ActiveMQAMQPException(AmqpError.INVALID_FIELD, "Invalid filter", ActiveMQExceptionType.INVALID_FILTER_EXPRESSION);
            }

            supportedFilters.put(filter.getKey(), filter.getValue());
         }
      }

      if (source == null) {
         // Attempt to recover a previous subscription happens when a link reattach happens on a
         // subscription queue
         String pubId = protonSender.getName();
         global = verifyDesiredCapability(protonSender, GLOBAL);
         shared = verifyDesiredCapability(protonSender, SHARED);
         queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
         QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
         multicast = true;
         routingTypeToUse = RoutingType.MULTICAST;

         // Once confirmed that the address exists we need to return a Source that reflects
         // the lifetime policy and capabilities of the new subscription.
         if (result.isExists()) {
            source = new org.apache.qpid.proton.amqp.messaging.Source();
            source.setAddress(queue.toString());
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDistributionMode(COPY);
            source.setCapabilities(TOPIC_CAPABILITY);

            SimpleString filterString = result.getFilterString();
            if (filterString != null) {
               selector = filterString.toString();
               boolean noLocal = false;

               String remoteContainerId = protonSender.getSession().getConnection().getRemoteContainer();
               String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";

               if (selector.endsWith(noLocalFilter)) {
                  if (selector.length() > noLocalFilter.length()) {
                     noLocalFilter = " AND " + noLocalFilter;
                     selector = selector.substring(0, selector.length() - noLocalFilter.length());
                  } else {
                     selector = null;
                  }

                  noLocal = true;
               }

               if (noLocal) {
                  supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
               }

               if (selector != null && !selector.trim().isEmpty()) {
                  supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(selector));
               }
            }

            protonSender.setSource(source);
         } else {
            throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + protonSender.getName());
         }
      } else if (source.getDynamic()) {
         // if dynamic we have to create the node (queue) and set the address on the target, the
         // node is temporary and  will be deleted on closing of the session
         queue = SimpleString.of(java.util.UUID.randomUUID().toString());
         tempQueueName = queue;
         try {
            sessionSPI.createTemporaryQueue(queue, RoutingType.ANYCAST);
            // protonSession.getServerSession().createQueue(queue, queue, null, true, false);
         } catch (Exception e) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
         }
         source.setAddress(queue.toString());
      } else {
         final String sourceAddress = ParameterisedAddress.extractAddress(source.getAddress());

         addressParameters = ParameterisedAddress.extractParameters(source.getAddress());

         SimpleString addressToUse;
         SimpleString queueNameToUse = null;
         shared = verifySourceCapability(source, SHARED);
         global = verifySourceCapability(source, GLOBAL);

         final boolean isFQQN;

         //find out if we have an address made up of the address and queue name, if yes then set queue name
         if (CompositeAddress.isFullyQualified(sourceAddress)) {
            isFQQN = true;
            addressToUse = SimpleString.of(CompositeAddress.extractAddressName(sourceAddress));
            queueNameToUse = SimpleString.of(CompositeAddress.extractQueueName(sourceAddress));
         } else {
            isFQQN = false;
            addressToUse = SimpleString.of(sourceAddress);
         }

         //check to see if the client has defined how we act
         boolean clientDefined = verifySourceCapability(source, TOPIC_CAPABILITY) || verifySourceCapability(source, QUEUE_CAPABILITY);
         if (clientDefined) {
            multicast = verifySourceCapability(source, TOPIC_CAPABILITY);
            AddressQueryResult addressQueryResult = null;
            try {
               addressQueryResult = sessionSPI.addressQuery(addressToUse, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, true);
            } catch (ActiveMQSecurityException e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
            } catch (ActiveMQAMQPException e) {
               throw e;
            } catch (Exception e) {
               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }

            if (!addressQueryResult.isExists()) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
            }

            Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();

            //if the client defines 1 routing type and the broker another then throw an exception
            if (multicast && !routingTypes.contains(RoutingType.MULTICAST)) {
               throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for topic support");
            } else if (!multicast && !routingTypes.contains(RoutingType.ANYCAST)) {
               //if client specifies fully qualified name that's allowed, don't throw exception.
               if (queueNameToUse == null) {
                  throw new ActiveMQAMQPIllegalStateException("Address " + addressToUse + " is not configured for queue support");
               }
            }
         } else {
            // if not we look up the address
            AddressQueryResult addressQueryResult = null;

            // Set this to the broker configured default for the address prior to the lookup so that
            // an auto create will actually use the configured defaults.  The actual query result will
            // contain the true answer on what routing type the address actually has though.
            final RoutingType routingType = sessionSPI.getDefaultRoutingType(addressToUse);
            routingTypeToUse = routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType;

            try {
               addressQueryResult = sessionSPI.addressQuery(addressToUse, routingTypeToUse, true);
            } catch (ActiveMQSecurityException e) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
            } catch (ActiveMQAMQPException e) {
               throw e;
            } catch (Exception e) {
               throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
            }

            if (!addressQueryResult.isExists()) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
            }

            Set<RoutingType> routingTypes = addressQueryResult.getRoutingTypes();
            if (routingTypes.contains(RoutingType.MULTICAST) && routingTypes.size() == 1) {
               multicast = true;
            } else {
               //todo add some checks if both routing types are supported
               multicast = false;
            }
         }
         routingTypeToUse = multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST;
         // if not dynamic then we use the target's address as the address to forward the
         // messages to, however there has to be a queue bound to it so we need to check this.
         if (multicast) {
            Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
            if (filter != null) {
               String remoteContainerId = protonSender.getSession().getConnection().getRemoteContainer();
               String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
               if (selector != null) {
                  selector += " AND " + noLocalFilter;
               } else {
                  selector = noLocalFilter;
               }

               supportedFilters.put(filter.getKey(), filter.getValue());
            }

            SimpleString simpleStringSelector = SimpleString.of(selector);
            queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST, simpleStringSelector, isFQQN);

            //if the address specifies a broker configured queue then we always use this, treat it as a queue
            if (queue != null) {
               multicast = false;
            } else if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) {

               // if we are a subscription and durable create a durable queue using the container
               // id and link name
               String pubId = protonSender.getName();
               queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
               QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
               if (result.isExists()) {
                  /*
                   * If a client attaches to an existing durable subscription with a different filter or address then
                   * we must recreate the queue (JMS semantics). However, if the corresponding queue is managed via the
                   * configuration then we don't want to change it. We must account for optional address prefixes that
                   * are not carried over into the actual created address by stripping any prefix value that matches
                   * those configured on the acceptor.
                   */
                  if (!result.isConfigurationManaged() &&
                      (!Objects.equals(result.getAddress(), sessionSPI.removePrefix(addressToUse)) ||
                       !Objects.equals(result.getFilterString(), simpleStringSelector))) {

                     if (result.getConsumerCount() == 0) {
                        sessionSPI.deleteQueue(queue);
                        if (shared) {
                           sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                        } else {
                           sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                        }
                     } else {
                        throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist");
                     }
                  }
               } else {
                  if (shared) {
                     sessionSPI.createSharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                  } else {
                     sessionSPI.createUnsharedDurableQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                  }
               }
            } else {
               // otherwise we are a volatile subscription
               isVolatile = true;
               if (shared && protonSender.getName() != null) {
                  queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, protonSender.getName(), shared, global, isVolatile);
                  QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
                  if ((!result.isExists() || !Objects.equals(result.getAddress(), addressToUse) || !Objects.equals(result.getFilterString(), simpleStringSelector)) && !result.isConfigurationManaged()) {
                     sessionSPI.createSharedVolatileQueue(addressToUse, RoutingType.MULTICAST, queue, simpleStringSelector);
                  }
               } else {
                  queue = SimpleString.of(java.util.UUID.randomUUID().toString());
                  tempQueueName = queue;
                  try {
                     sessionSPI.createTemporaryQueue(addressToUse, queue, RoutingType.MULTICAST, simpleStringSelector);
                  } catch (Exception e) {
                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage());
                  }
               }
            }
         } else {
            if (queueNameToUse != null) {
               SimpleString matchingAnycastQueue;
               QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, queueNameToUse), null, false, null);
               if (result.isExists()) {
                  // if the queue exists and we're using FQQN then just ignore the routing-type
                  routingTypeToUse = null;
               }
               matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, routingTypeToUse, null, false);
               if (matchingAnycastQueue != null) {
                  queue = matchingAnycastQueue;
               } else {
                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
               }
            } else {
               SimpleString matchingAnycastQueue = sessionSPI.getMatchingQueue(addressToUse, RoutingType.ANYCAST);
               if (matchingAnycastQueue != null) {
                  queue = matchingAnycastQueue;
               } else {
                  queue = addressToUse;
               }
            }
         }

         if (queue == null) {
            throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet();
         }

         try {
            if (!sessionSPI.queueQuery(queue, routingTypeToUse, !multicast).isExists()) {
               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist();
            }
         } catch (ActiveMQAMQPNotFoundException e) {
            throw e;
         } catch (Exception e) {
            throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
         }
      }

      // We need to update the source with any filters we support otherwise the client
      // is free to consider the attach as having failed if we don't send back what we
      // do support or if we send something we don't support the client won't know we
      // have not honored what it asked for.
      source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);

      final boolean browseOnly = !multicast && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY);
      final Number consumerPriority = getReceiverPriority(protonSender.getRemoteProperties(), extractConsumerPriority(addressParameters));

      // Any new parameters used should be extracted from the values parsed from the address to avoid this log message.
      if (!addressParameters.isEmpty()) {
         final String unusedParametersMessage = ""
            + " Not all specified address options were applicable to the created server consumer."
            + " Check the options are spelled correctly."
            + " Unused parameters=[" + addressParameters + "].";

         logger.debug(unusedParametersMessage);
      }

      return sessionSPI.createSender(senderContext, queue, multicast ? null : selector, browseOnly, consumerPriority);
   }