private void slowPacketHandler()

in artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java [326:700]


   private void slowPacketHandler(final Packet packet) {
      final byte type = packet.getType();
      storageManager.setContext(session.getSessionContext());

      Packet response = null;
      boolean flush = false;
      boolean closeChannel = false;
      boolean requiresResponse = false;

      try {
         try {
            switch (type) {
               case SESS_SEND_LARGE: {
                  SessionSendLargeMessage message = (SessionSendLargeMessage) packet;
                  sendLarge(message.getLargeMessage());
                  break;
               }
               case SESS_SEND_CONTINUATION: {
                  SessionSendContinuationMessage message = (SessionSendContinuationMessage) packet;
                  requiresResponse = message.isRequiresResponse();
                  int senderID = message.getSenderID();
                  sendContinuations(message.getPacketSize(), message.getMessageBodySize(), message.getBody(), message.isContinues(), senderID);
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case SESS_CREATECONSUMER: {
                  SessionCreateConsumerMessage request = (SessionCreateConsumerMessage) packet;
                  requiresResponse = request.isRequiresResponse();
                  session.createConsumer(request.getID(), request.getQueueName(), request.getFilterString(), request.getPriority(), request.isBrowseOnly(), true, null);
                  if (requiresResponse) {
                     // We send back queue information on the queue as a response- this allows the queue to
                     // be automatically recreated on failover
                     QueueQueryResult queueQueryResult = session.executeQueueQuery(request.getQueueName());

                     if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
                        response = new SessionQueueQueryResponseMessage_V3(queueQueryResult);
                     } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
                        response = new SessionQueueQueryResponseMessage_V2(queueQueryResult);
                     } else {
                        response = new SessionQueueQueryResponseMessage(queueQueryResult);
                     }
                  }

                  break;
               }
               case CREATE_ADDRESS: {
                  CreateAddressMessage request = (CreateAddressMessage) packet;
                  requiresResponse = request.isRequiresResponse();
                  session.createAddress(request.getAddress(), request.getRoutingTypes(), request.isAutoCreated());
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case CREATE_QUEUE: {
                  CreateQueueMessage request = (CreateQueueMessage) packet;
                  requiresResponse = request.isRequiresResponse();
                  session.createQueue(QueueConfiguration.of(request.getQueueName())
                                         .setAddress(request.getAddress())
                                         .setRoutingType(getRoutingTypeFromAddress(request.getAddress()))
                                         .setFilterString(request.getFilterString())
                                         .setTemporary(request.isTemporary())
                                         .setDurable(request.isDurable()));
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case CREATE_QUEUE_V2: {
                  CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
                  requiresResponse = request.isRequiresResponse();
                  session.createQueue(request.toQueueConfiguration());

                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case CREATE_SHARED_QUEUE: {
                  CreateSharedQueueMessage request = (CreateSharedQueueMessage) packet;
                  requiresResponse = request.isRequiresResponse();
                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                  if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
                     session.createSharedQueue(QueueConfiguration.of(request.getQueueName())
                                                  .setAddress(request.getAddress())
                                                  .setFilterString(request.getFilterString())
                                                  .setDurable(request.isDurable()));
                  }
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case CREATE_SHARED_QUEUE_V2: {
                  CreateSharedQueueMessage_V2 request = (CreateSharedQueueMessage_V2) packet;
                  requiresResponse = request.isRequiresResponse();
                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                  if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
                     session.createSharedQueue(request.toQueueConfiguration());
                  }
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case DELETE_QUEUE: {
                  requiresResponse = true;
                  SessionDeleteQueueMessage request = (SessionDeleteQueueMessage) packet;
                  session.deleteQueue(request.getQueueName());
                  response = createNullResponseMessage(packet);
                  break;
               }
               case SESS_QUEUEQUERY: {
                  requiresResponse = true;
                  SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet;
                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());

                  if (result.isExists() && remotingConnection.getChannelVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
                     result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType()));
                  }

                  if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) {
                     response = new SessionQueueQueryResponseMessage_V3(result);
                  } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {
                     response = new SessionQueueQueryResponseMessage_V2(result);
                  } else {
                     response = new SessionQueueQueryResponseMessage(result);
                  }
                  break;
               }
               case SESS_BINDINGQUERY: {
                  requiresResponse = true;
                  SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
                  final int clientVersion = remotingConnection.getChannelVersion();
                  BindingQueryResult result = session.executeBindingQuery(request.getAddress());

                  /*
                   * if the session is JMS and it's from an older client then we need to add the old prefix to the queue
                   * names otherwise the older client won't realize the queue exists and will try to create it and receive
                   * an error
                   */
                  if (result.isExists() && clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
                     final List<SimpleString> queueNames = result.getQueueNames();
                     if (!queueNames.isEmpty()) {
                        final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
                        if (convertedQueueNames != queueNames) {
                           result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
                        }
                     }
                  }

                  if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V5)) {
                     response = new SessionBindingQueryResponseMessage_V5(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch(), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.MULTICAST), result.getAddressInfo() == null ? false : result.getAddressInfo().getRoutingTypes().contains(RoutingType.ANYCAST));
                  } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
                     response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
                  } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
                     response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());
                  } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
                     response = new SessionBindingQueryResponseMessage_V2(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues());
                  } else {
                     response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
                  }
                  break;
               }
               case SESS_EXPIRED: {
                  SessionExpireMessage message = (SessionExpireMessage) packet;
                  session.expire(message.getConsumerID(), message.getMessageID());
                  break;
               }
               case SESS_COMMIT: {
                  requiresResponse = true;
                  session.commit();
                  response = createNullResponseMessage(packet);
                  break;
               }
               case SESS_ROLLBACK: {
                  requiresResponse = true;
                  session.rollback(((RollbackMessage) packet).isConsiderLastMessageAsDelivered());
                  response = createNullResponseMessage(packet);
                  break;
               }
               case SESS_XA_COMMIT: {
                  requiresResponse = true;
                  SessionXACommitMessage message = (SessionXACommitMessage) packet;
                  session.xaCommit(message.getXid(), message.isOnePhase());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_END: {
                  requiresResponse = true;
                  SessionXAEndMessage message = (SessionXAEndMessage) packet;
                  session.xaEnd(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_FORGET: {
                  requiresResponse = true;
                  SessionXAForgetMessage message = (SessionXAForgetMessage) packet;
                  session.xaForget(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_JOIN: {
                  requiresResponse = true;
                  SessionXAJoinMessage message = (SessionXAJoinMessage) packet;
                  session.xaJoin(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_RESUME: {
                  requiresResponse = true;
                  SessionXAResumeMessage message = (SessionXAResumeMessage) packet;
                  session.xaResume(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_ROLLBACK: {
                  requiresResponse = true;
                  SessionXARollbackMessage message = (SessionXARollbackMessage) packet;
                  session.xaRollback(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_START: {
                  requiresResponse = true;
                  SessionXAStartMessage message = (SessionXAStartMessage) packet;
                  session.xaStart(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_FAILED: {
                  requiresResponse = true;
                  SessionXAAfterFailedMessage message = (SessionXAAfterFailedMessage) packet;
                  session.xaFailed(message.getXid());
                  // no response on this case
                  break;
               }
               case SESS_XA_SUSPEND: {
                  requiresResponse = true;
                  session.xaSuspend();
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_PREPARE: {
                  requiresResponse = true;
                  SessionXAPrepareMessage message = (SessionXAPrepareMessage) packet;
                  session.xaPrepare(message.getXid());
                  response = createSessionXAResponseMessage(packet);
                  break;
               }
               case SESS_XA_INDOUBT_XIDS: {
                  requiresResponse = true;
                  List<Xid> xids = session.xaGetInDoubtXids();
                  response = new SessionXAGetInDoubtXidsResponseMessage(xids);
                  break;
               }
               case SESS_XA_GET_TIMEOUT: {
                  requiresResponse = true;
                  int timeout = session.xaGetTimeout();
                  response = new SessionXAGetTimeoutResponseMessage(timeout);
                  break;
               }
               case SESS_XA_SET_TIMEOUT: {
                  requiresResponse = true;
                  SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage) packet;
                  session.xaSetTimeout(message.getTimeoutSeconds());
                  response = new SessionXASetTimeoutResponseMessage(true);
                  break;
               }
               case SESS_START: {
                  session.start();
                  break;
               }
               case SESS_STOP: {
                  requiresResponse = true;
                  session.stop();
                  response = createNullResponseMessage(packet);
                  break;
               }
               case SESS_CLOSE: {
                  requiresResponse = true;
                  session.close(false);
                  // removeConnectionListeners();
                  response = createNullResponseMessage(packet);
                  flush = true;
                  closeChannel = true;
                  break;
               }
               case SESS_INDIVIDUAL_ACKNOWLEDGE: {
                  SessionIndividualAcknowledgeMessage message = (SessionIndividualAcknowledgeMessage) packet;
                  requiresResponse = message.isRequiresResponse();
                  session.individualAcknowledge(message.getConsumerID(), message.getMessageID());
                  if (requiresResponse) {
                     response = createNullResponseMessage(packet);
                  }
                  break;
               }
               case SESS_CONSUMER_CLOSE: {
                  requiresResponse = true;
                  SessionConsumerCloseMessage message = (SessionConsumerCloseMessage) packet;
                  session.closeConsumer(message.getConsumerID());
                  response = createNullResponseMessage(packet);
                  break;
               }
               case SESS_FORCE_CONSUMER_DELIVERY: {
                  SessionForceConsumerDelivery message = (SessionForceConsumerDelivery) packet;
                  session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
                  break;
               }
               case PacketImpl.SESS_ADD_METADATA: {
                  response = createNullResponseMessage(packet);
                  SessionAddMetaDataMessage message = (SessionAddMetaDataMessage) packet;
                  session.addMetaData(message.getKey(), message.getData());
                  break;
               }
               case PacketImpl.SESS_ADD_METADATA2: {
                  requiresResponse = true;
                  SessionAddMetaDataMessageV2 message = (SessionAddMetaDataMessageV2) packet;
                  if (message.isRequiresConfirmations()) {
                     response = createNullResponseMessage(packet);
                  }
                  session.addMetaData(message.getKey(), message.getData());
                  break;
               }
               case PacketImpl.SESS_UNIQUE_ADD_METADATA: {
                  requiresResponse = true;
                  SessionUniqueAddMetaDataMessage message = (SessionUniqueAddMetaDataMessage) packet;
                  if (session.addUniqueMetaData(message.getKey(), message.getData())) {
                     response = createNullResponseMessage(packet);
                  } else {
                     response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.duplicateMetadata(message.getKey(), message.getData()));
                  }
                  break;
               }
               case CREATE_PRODUCER: {
                  CreateProducerMessage message = (CreateProducerMessage) packet;
                  if (!producers.containsKey(message.getId())) {
                     // this is used to create/destroy the producer so needs to be unique
                     String senderName = PRODUCER_ID_PREFIX + UUIDGenerator.getInstance().generateUUID();
                     producers.put(message.getId(), senderName);
                     session.addProducer(senderName, ActiveMQClient.DEFAULT_CORE_PROTOCOL, message.getAddress() != null ? message.getAddress().toString() : null);
                  } else {
                     ActiveMQServerLogger.LOGGER.producerAlreadyExists(message.getId(), session.getName(), remotingConnection.getRemoteAddress());
                  }
                  break;
               }
               case REMOVE_PRODUCER: {
                  RemoveProducerMessage message = (RemoveProducerMessage) packet;
                  String remove = producers.remove(message.getId());
                  if (remove != null) {
                     session.removeProducer(remove);
                  } else {
                     ActiveMQServerLogger.LOGGER.producerDoesNotExist(message.getId(), session.getName(), remotingConnection.getRemoteAddress());
                  }
                  break;
               }
            }
         } catch (ActiveMQIOErrorException e) {
            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
         } catch (ActiveMQXAException e) {
            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQException e) {
            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (Throwable t) {
            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
         }
         sendResponse(packet, response, flush, closeChannel);
      } finally {
         storageManager.clearContext();
      }
   }