public void process()

in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java [70:105]


        public void process(T data) throws Throwable {
            if (data != null) {
                byte[] value = this.serializer.serialize(key, data);
                if (value == null || value.length == 0) {
                    //目前RocketMQ不支持发送body为null的消息;
                    return;
                }

                Message message;

                if (this.key == null) {
                    message = new Message(this.topicName, value);
                    message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());
                    if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                        message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
                    }

                    producer.send(message);
                } else {
                    message = new Message(this.topicName, value);
                    String hexKey = Utils.toHexString(this.key);
                    //the real key is in the body, this key is used to route the same key into the same queue.
                    message.setKeys(hexKey);


                    message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, this.key.getClass().getName());
                    message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());

                    if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
                        message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
                    }

                    producer.send(message, new SelectMessageQueueByHash(), hexKey);
                }
            }
        }