private static ActiveMQMessage toAMQMessage()

in artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java [505:689]


   private static ActiveMQMessage toAMQMessage(MessageReference reference,
                                               ICoreMessage coreMessage,
                                               WireFormat marshaller,
                                               AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
      final ActiveMQMessage amqMsg;
      final byte coreType = coreMessage.getType();
      final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_COMPRESSED);
      final boolean isCompressed = compressProp != null && compressProp;
      final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
      buffer.resetReaderIndex();

      final byte[] bytes = switch (coreType) {
         case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE -> {
            amqMsg = new EagerActiveMQBytesMessage(0);
            yield toAMQMessageBytesType(buffer, isCompressed);
         }
         case org.apache.activemq.artemis.api.core.Message.MAP_TYPE -> {
            amqMsg = new ActiveMQMapMessage();
            yield toAMQMessageMapType(buffer, isCompressed);
         }
         case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE -> {
            amqMsg = new ActiveMQObjectMessage();
            yield toAMQMessageObjectType(buffer, isCompressed);
         }
         case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE -> {
            amqMsg = new ActiveMQStreamMessage();
            yield toAMQMessageStreamType(buffer, isCompressed);
         }
         case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE -> {
            amqMsg = new ActiveMQTextMessage();
            yield toAMQMessageTextType(buffer, isCompressed);
         }
         case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE -> {
            amqMsg = new ActiveMQMessage();
            yield toAMQMessageDefaultType(buffer, isCompressed);
         }
         default -> throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
      };

      final String type = getObjectProperty(coreMessage, String.class, OpenWireConstants.JMS_TYPE_PROPERTY);
      if (type != null) {
         amqMsg.setJMSType(type);
      }
      amqMsg.setPersistent(coreMessage.isDurable());
      amqMsg.setExpiration(coreMessage.getExpiration());
      amqMsg.setPriority(coreMessage.getPriority());
      amqMsg.setTimestamp(coreMessage.getTimestamp());

      Long brokerInTime = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_BROKER_IN_TIME);
      if (brokerInTime == null) {
         brokerInTime = 0L;
      }
      amqMsg.setBrokerInTime(brokerInTime);

      amqMsg.setCompressed(isCompressed);

      //we need check null because messages may come from other clients
      //and those amq specific attribute may not be set.
      Long arrival = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_ARRIVAL);
      if (arrival == null) {
         //messages from other sources (like core client) may not set this prop
         arrival = 0L;
      }
      amqMsg.setArrival(arrival);

      final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_BROKER_PATH);
      if (brokerPath != null && !brokerPath.isEmpty()) {
         setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
      }

      final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_CLUSTER);
      if (clusterPath != null && !clusterPath.isEmpty()) {
         setAMQMsgClusterPath(amqMsg, clusterPath.toString());
      }

      Integer commandId = getObjectProperty(coreMessage, Integer.class, OpenWireConstants.AMQ_MSG_COMMAND_ID);
      if (commandId == null) {
         commandId = -1;
      }
      amqMsg.setCommandId(commandId);

      final Object correlationID = coreMessage.getCorrelationID();
      if (correlationID instanceof String || correlationID instanceof SimpleString) {
         amqMsg.setCorrelationId(correlationID.toString());
      } else if (correlationID instanceof byte[] correlationIdBytes) {
         try {
            amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(correlationIdBytes)).toString());
         } catch (MalformedInputException e) {
            ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage());
         }
      }

      final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
      if (dsBytes != null) {
         setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
      }
      final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
      amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));

      final Object value = coreMessage.getGroupID();
      if (value != null) {
         String groupId = value.toString();
         amqMsg.setGroupID(groupId);
      }

      amqMsg.setGroupSequence(coreMessage.getGroupSequence());

      final Object messageIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_MESSAGE_ID);
      final MessageId messageId;
      if (messageIdValue instanceof SimpleString) {
         messageId = new MessageId(messageIdValue.toString());
      } else if (messageIdValue instanceof byte[] messageIdBytes) {
         ByteSequence midSeq = new ByteSequence(messageIdBytes);
         messageId = (MessageId) marshaller.unmarshal(midSeq);
      } else {
         //  ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
         // if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
         String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
         messageId = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
      }

      amqMsg.setMessageId(messageId);

      final Object origDestValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_ORIG_DESTINATION);
      if (origDestValue instanceof SimpleString) {
         amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE));
      } else if (origDestValue instanceof byte[] origDestValueBytes) {
         ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestValueBytes));
         amqMsg.setOriginalDestination(origDest);
      }

      final Object producerIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_PRODUCER_ID);
      if (producerIdValue instanceof SimpleString simpleString && !simpleString.isEmpty()) {
         amqMsg.setProducerId(new ProducerId(producerIdValue.toString()));
      } else if (producerIdValue instanceof byte[] producerIdBytes) {
         ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
         amqMsg.setProducerId(producerId);
      }

      amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);

      final Object replyToValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_REPLY_TO);
      if (replyToValue instanceof SimpleString) {
         amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE));
      } else if (replyToValue instanceof byte[] replyToBytes) {
         ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
         amqMsg.setReplyTo(replyTo);
      }

      final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_USER_ID);
      if (userId != null && !userId.isEmpty()) {
         amqMsg.setUserID(userId.toString());
      }

      final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_DROPPABLE);
      if (isDroppable != null) {
         amqMsg.setDroppable(isDroppable);
      }

      final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
      if (dlqCause != null) {
         setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
      }

      final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
      if (lastValueProperty != null) {
         setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
      }

      final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP);
      if (ingressTimestamp != null) {
         setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
      }

      final Set<SimpleString> props = coreMessage.getPropertyNames();
      if (props != null) {
         setAMQMsgObjectProperties(amqMsg, coreMessage, props);
      }

      if (bytes != null) {
         ByteSequence content = new ByteSequence(bytes);
         amqMsg.setContent(content);
      }
      return amqMsg;
   }