in connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java [217:308]
public void send(final MQDestination destination, String topicName,
com.alibaba.otter.canal.protocol.Message message) {
// 获取当前topic的分区数
Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
destination.getDynamicTopicPartitionNum());
if (partitionNum == null) {
partitionNum = destination.getPartitionsNum();
}
// 创建多分区topic
if (pulsarAdmin != null && partitionNum != null && partitionNum > 0 && PRODUCERS.get(topicName) == null) {
createMultipleTopic(topicName, partitionNum);
}
ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
// 并发构造
MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
if (!mqProperties.isFlatMessage()) {
// 动态计算目标分区
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
for (MQMessageUtils.EntryRowData r : datas) {
CanalEntry.Entry entry = r.entry;
if (null == entry) {
continue;
}
// 串行分区
com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
message.getId(),
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
// 发送
int len = messages.length;
for (int i = 0; i < len; i++) {
final int partition = i;
com.alibaba.otter.canal.protocol.Message m = messages[i];
template.submit(() -> {
sendMessage(topicName, partition, m);
});
}
}
} else {
// 默认分区
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
sendMessage(topicName, partition, message);
}
} else {
// 串行分区
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
// 初始化分区合并队列
if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
int len = partitionNum;
for (int i = 0; i < len; i++) {
partitionFlatMessages.add(new ArrayList<>());
}
for (FlatMessage flatMessage : flatMessages) {
FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
partitionNum,
destination.getPartitionHash(),
mqProperties.isDatabaseHash());
int length = partitionFlatMessage.length;
for (int i = 0; i < length; i++) {
// 增加null判断,issue #3267
if (partitionFlatMessage[i] != null) {
partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
}
}
}
for (int i = 0; i < len; i++) {
final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
if (flatMessagePart != null && flatMessagePart.size() > 0) {
final int partition = i;
template.submit(() -> {
// 批量发送
sendMessage(topicName, partition, flatMessagePart);
});
}
}
// 批量等所有分区的结果
template.waitForResult();
} else {
// 默认分区
final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
sendMessage(topicName, partition, flatMessages);
}
}
}