in connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java [185:292]
public void send(final MQDestination destination, String topicName,
com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
// 获取topic的队列数为分区数
if (partitionNum == null) {
partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
}
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
if (!mqProperties.isFlatMessage()) {
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = messages.length;
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < length; i++) {
com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
if (dataPartition != null) {
final int index = i;
template.submit(() -> {
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(dataPartition,
mqProperties.isFilterTransactionEntry()));
sendMessage(data, index);
});
}
}
// 等所有分片发送完毕
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
Message data = new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
sendMessage(data, partition);
}
} else {
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
for (int i = 0; i < partitionNum; i++) {
partitionFlatMessages.add(new ArrayList<>());
}
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
for (int i = 0; i < partitionFlatMessages.size(); i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int index = i;
template.submit(() -> {
List<Message> messages = flatMessagePart.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage,
JSONWriter.Feature.WriteNulls,
JSONWriter.Feature.LargeObject)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, index);
});
}
}
// 批量等所有分区的结果
template.waitForResult();
} else {
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
List<Message> messages = flatMessages.stream()
.map(flatMessage -> new Message(topicName,
((RocketMQProducerConfig) this.mqProperties).getTag(),
JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
.collect(Collectors.toList());
// 批量发送
sendMessage(messages, partition);
}
}
}