public static ICoreMessage toCore()

in artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java [116:243]


   public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws ConversionException {
      final long messageId = message.getMessageID();
      final Symbol contentType = properties != null ? properties.getContentType() : null;
      final String contentTypeString = contentType != null ? contentType.toString() : null;

      CoreMessageWrapper result;

      if (body == null) {
         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
            result = createObjectMessage(messageId, coreMessageObjectPools);
         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
            result = createBytesMessage(messageId, coreMessageObjectPools);
         } else {
            Charset charset = getCharsetForTextualContent(contentTypeString);
            if (charset != null) {
               result = createTextMessage(messageId, coreMessageObjectPools);
            } else {
               result = createMessage(messageId, coreMessageObjectPools);
            }
         }

         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
      } else if (body instanceof Data data) {
         Binary payload = data.getValue();

         if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
            result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
         } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
            result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
         } else {
            Charset charset = getCharsetForTextualContent(contentTypeString);
            if (StandardCharsets.UTF_8.equals(charset)) {
               ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());

               try {
                  CharBuffer chars = charset.newDecoder().decode(buf);
                  result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
               } catch (CharacterCodingException e) {
                  result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
               }
            } else {
               result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
            }
         }

         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
      } else if (body instanceof AmqpSequence amqpSequence) {
         CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
         for (Object item : amqpSequence.getValue()) {
            m.writeObject(item);
         }

         result = m;
         result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
      } else if (body instanceof AmqpValue amqpValue) {
         Object value = amqpValue.getValue();
         if (value == null || value instanceof String) {
            result = createTextMessage(messageId, (String) value, coreMessageObjectPools);

            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
         } else if (value instanceof Binary binary) {

            if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
               result = createObjectMessage(messageId, binary, coreMessageObjectPools);
            } else {
               result = createBytesMessage(messageId, binary.getArray(), binary.getArrayOffset(), binary.getLength(), coreMessageObjectPools);
            }

            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
         } else if (value instanceof List) {
            CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
            for (Object item : (List<Object>) value) {
               m.writeObject(item);
            }
            result = m;
            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
         } else if (value instanceof Map) {
            result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
            result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
         } else {
            ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
            try {
               TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
               TLSEncode.getEncoder().writeObject(body);
               result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
            } finally {
               buf.release();
               TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
            }
         }
      } else {
         throw new RuntimeException("Unexpected body type: " + body.getClass());
      }

      // Initialize the JMS expiration with the value calculated during the scan of the AMQP message
      // to avoid a different value for each conversion based on System.currentTimeMillis() and ttl.
      result.setJMSExpiration(message.getExpiration());

      processHeader(result, header);
      processMessageAnnotations(result, annotations);
      processApplicationProperties(result, applicationProperties);
      processProperties(result, properties, annotations);
      processFooter(result, footer);
      processExtraProperties(result, message.getExtraProperties());

      // If the JMS expiration has not yet been set...
      if (header != null && result.getExpiration() == 0) {
         // Then lets try to set it based on the message TTL.
         long ttl = MESSAGE_DEFAULT_TIME_TO_LIVE;
         if (header.getTtl() != null) {
            ttl = header.getTtl().longValue();
         }

         if (ttl == 0) {
            result.setJMSExpiration(0);
         } else {
            result.setJMSExpiration(System.currentTimeMillis() + ttl);
         }
      }

      result.getInnerMessage().setDurable(message.isDurable());
      result.getInnerMessage().setPriority(message.getPriority());
      result.getInnerMessage().setAddress(message.getAddressSimpleString());
      result.getInnerMessage().setRoutingType(message.getRoutingType());
      result.encode();

      return result.getInnerMessage();
   }