connectors/rocketmq-connect-jms/src/main/java/org/apache/rocketmq/connect/jms/connector/BaseJmsSourceTask.java [111:142]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public String getMessageContent(Message message) throws JMSException {
        String data = null;
        if (message instanceof TextMessage) {
            data = ((TextMessage) message).getText();
        } else if (message instanceof ObjectMessage) {
            data = JSON.toJSONString(((ObjectMessage) message).getObject());
        } else if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            data = bytesMessage.toString();
        } else if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            Map<String, Object> map = new HashMap<>();
            Enumeration<Object> names = mapMessage.getMapNames();
            while (names.hasMoreElements()) {
                String name = names.nextElement().toString();
                map.put(name, mapMessage.getObject(name));
            }
            data = JSON.toJSONString(map);
        } else if (message instanceof StreamMessage) {
            StreamMessage streamMessage = (StreamMessage) message;
            ByteArrayOutputStream bis = new ByteArrayOutputStream();
            byte[] by = new byte[1024];
            int i = 0;
            while ((i = streamMessage.readBytes(by)) != -1) {
                bis.write(by, 0, i);
            }
            data = bis.toString();
        } else {
            throw new RuntimeException("message type exception");
        }
        return data;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



connectors/rocketmq-connect-activemq/src/main/java/org/apache/rocketmq/connect/activemq/connector/ActivemqSourceTask.java [104:136]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    public String getMessageContent(Message message) throws JMSException {
        String data = null;
        if (message instanceof TextMessage) {
            data = ((TextMessage) message).getText();
        } else if (message instanceof ObjectMessage) {
            data = JSON.toJSONString(((ObjectMessage) message).getObject());
        } else if (message instanceof BytesMessage) {
            BytesMessage bytesMessage = (BytesMessage) message;
            data = bytesMessage.toString();
        } else if (message instanceof MapMessage) {
            MapMessage mapMessage = (MapMessage) message;
            Map<String, Object> map = new HashMap<>();
            Enumeration<Object> names = mapMessage.getMapNames();
            while (names.hasMoreElements()) {
                String name = names.nextElement().toString();
                map.put(name, mapMessage.getObject(name));
            }
            data = JSON.toJSONString(map);
        } else if (message instanceof StreamMessage) {
            StreamMessage streamMessage = (StreamMessage) message;
            ByteArrayOutputStream bis = new ByteArrayOutputStream();
            byte[] by = new byte[1024];
            int i = 0;
            while ((i = streamMessage.readBytes(by)) != -1) {
                bis.write(by, 0, i);
            }
            data = bis.toString();
        } else {
            // The exception is printed and does not need to be written as a DataConnectException
            throw new RuntimeException("message type exception");
        }
        return data;
    }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



