in core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java [70:105]
public void process(T data) throws Throwable {
if (data != null) {
byte[] value = this.serializer.serialize(key, data);
if (value == null || value.length == 0) {
//目前RocketMQ不支持发送body为null的消息;
return;
}
Message message;
if (this.key == null) {
message = new Message(this.topicName, value);
message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());
if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
}
producer.send(message);
} else {
message = new Message(this.topicName, value);
String hexKey = Utils.toHexString(this.key);
//the real key is in the body, this key is used to route the same key into the same queue.
message.setKeys(hexKey);
message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, this.key.getClass().getName());
message.putUserProperty(Constant.SHUFFLE_VALUE_CLASS_NAME, data.getClass().getName());
if (this.topicName.contains(Constant.SHUFFLE_TOPIC_SUFFIX)) {
message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
}
producer.send(message, new SelectMessageQueueByHash(), hexKey);
}
}
}