private static ActiveMQMessage toAMQMessage()

in artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java [509:689]


   private static ActiveMQMessage toAMQMessage(MessageReference reference,
                                               ICoreMessage coreMessage,
                                               WireFormat marshaller,
                                               AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
      final ActiveMQMessage amqMsg;
      final byte coreType = coreMessage.getType();
      final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_COMPRESSED);
      final boolean isCompressed = compressProp != null && compressProp;
      final byte[] bytes;
      final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
      buffer.resetReaderIndex();

      switch (coreType) {
         case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
            amqMsg = new EagerActiveMQBytesMessage(0);
            bytes = toAMQMessageBytesType(buffer, isCompressed);
            break;
         case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
            amqMsg = new ActiveMQMapMessage();
            bytes = toAMQMessageMapType(buffer, isCompressed);
            break;
         case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
            amqMsg = new ActiveMQObjectMessage();
            bytes = toAMQMessageObjectType(buffer, isCompressed);
            break;
         case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
            amqMsg = new ActiveMQStreamMessage();
            bytes = toAMQMessageStreamType(buffer, isCompressed);
            break;
         case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
            amqMsg = new ActiveMQTextMessage();
            bytes = toAMQMessageTextType(buffer, isCompressed);
            break;
         case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
            amqMsg = new ActiveMQMessage();
            bytes = toAMQMessageDefaultType(buffer, isCompressed);
            break;
         default:
            throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
      }

      final String type = getObjectProperty(coreMessage, String.class, OpenWireConstants.JMS_TYPE_PROPERTY);
      if (type != null) {
         amqMsg.setJMSType(type);
      }
      amqMsg.setPersistent(coreMessage.isDurable());
      amqMsg.setExpiration(coreMessage.getExpiration());
      amqMsg.setPriority(coreMessage.getPriority());
      amqMsg.setTimestamp(coreMessage.getTimestamp());

      Long brokerInTime = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_BROKER_IN_TIME);
      if (brokerInTime == null) {
         brokerInTime = 0L;
      }
      amqMsg.setBrokerInTime(brokerInTime);

      amqMsg.setCompressed(isCompressed);

      //we need check null because messages may come from other clients
      //and those amq specific attribute may not be set.
      Long arrival = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_ARRIVAL);
      if (arrival == null) {
         //messages from other sources (like core client) may not set this prop
         arrival = 0L;
      }
      amqMsg.setArrival(arrival);

      final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_BROKER_PATH);
      if (brokerPath != null && brokerPath.length() > 0) {
         setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
      }

      final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_CLUSTER);
      if (clusterPath != null && clusterPath.length() > 0) {
         setAMQMsgClusterPath(amqMsg, clusterPath.toString());
      }

      Integer commandId = getObjectProperty(coreMessage, Integer.class, OpenWireConstants.AMQ_MSG_COMMAND_ID);
      if (commandId == null) {
         commandId = -1;
      }
      amqMsg.setCommandId(commandId);

      final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY);
      if (corrId != null) {
         amqMsg.setCorrelationId(corrId.toString());
      }

      final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
      if (dsBytes != null) {
         setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
      }
      final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
      amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));

      final Object value = coreMessage.getGroupID();
      if (value != null) {
         String groupId = value.toString();
         amqMsg.setGroupID(groupId);
      }

      amqMsg.setGroupSequence(coreMessage.getGroupSequence());

      final Object messageIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_MESSAGE_ID);
      final MessageId messageId;
      if (messageIdValue instanceof SimpleString) {
         messageId = new MessageId(messageIdValue.toString());
      } else if (messageIdValue instanceof byte[]) {
         ByteSequence midSeq = new ByteSequence((byte[]) messageIdValue);
         messageId = (MessageId) marshaller.unmarshal(midSeq);
      } else {
         //  ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
         // if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
         String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
         messageId = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
      }

      amqMsg.setMessageId(messageId);

      final Object origDestValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_ORIG_DESTINATION);
      if (origDestValue instanceof SimpleString) {
         amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE));
      } else if (origDestValue instanceof byte[]) {
         ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence((byte[]) origDestValue));
         amqMsg.setOriginalDestination(origDest);
      }

      final Object producerIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_PRODUCER_ID);
      if (producerIdValue instanceof SimpleString && ((SimpleString) producerIdValue).length() > 0) {
         amqMsg.setProducerId(new ProducerId(producerIdValue.toString()));
      } else if (producerIdValue instanceof byte[]) {
         ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence((byte[]) producerIdValue));
         amqMsg.setProducerId(producerId);
      }

      amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);

      final Object replyToValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_REPLY_TO);
      if (replyToValue instanceof SimpleString) {
         amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE));
      } else if (replyToValue instanceof byte[]) {
         ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence((byte[]) replyToValue));
         amqMsg.setReplyTo(replyTo);
      }

      final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_USER_ID);
      if (userId != null && userId.length() > 0) {
         amqMsg.setUserID(userId.toString());
      }

      final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_DROPPABLE);
      if (isDroppable != null) {
         amqMsg.setDroppable(isDroppable);
      }

      final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
      if (dlqCause != null) {
         setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
      }

      final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
      if (lastValueProperty != null) {
         setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
      }

      final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP);
      if (ingressTimestamp != null) {
         setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
      }

      final Set<SimpleString> props = coreMessage.getPropertyNames();
      if (props != null) {
         setAMQMsgObjectProperties(amqMsg, coreMessage, props);
      }

      if (bytes != null) {
         ByteSequence content = new ByteSequence(bytes);
         amqMsg.setContent(content);
      }
      return amqMsg;
   }