public void send()

in connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java [217:308]


    public void send(final MQDestination destination, String topicName,
                     com.alibaba.otter.canal.protocol.Message message) {

        // 获取当前topic的分区数
        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
            destination.getDynamicTopicPartitionNum());
        if (partitionNum == null) {
            partitionNum = destination.getPartitionsNum();
        }
        // 创建多分区topic
        if (pulsarAdmin != null && partitionNum != null && partitionNum > 0 && PRODUCERS.get(topicName) == null) {
            createMultipleTopic(topicName, partitionNum);
        }

        ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
        // 并发构造
        MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
        if (!mqProperties.isFlatMessage()) {
            // 动态计算目标分区
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                for (MQMessageUtils.EntryRowData r : datas) {
                    CanalEntry.Entry entry = r.entry;
                    if (null == entry) {
                        continue;
                    }
                    // 串行分区
                    com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                        message.getId(),
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());
                    // 发送
                    int len = messages.length;
                    for (int i = 0; i < len; i++) {
                        final int partition = i;
                        com.alibaba.otter.canal.protocol.Message m = messages[i];
                        template.submit(() -> {
                            sendMessage(topicName, partition, m);
                        });
                    }
                }
            } else {
                // 默认分区
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                sendMessage(topicName, partition, message);
            }
        } else {
            // 串行分区
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());

            // 初始化分区合并队列
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
                int len = partitionNum;
                for (int i = 0; i < len; i++) {
                    partitionFlatMessages.add(new ArrayList<>());
                }

                for (FlatMessage flatMessage : flatMessages) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        // 增加null判断,issue #3267
                        if (partitionFlatMessage[i] != null) {
                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                        }
                    }
                }

                for (int i = 0; i < len; i++) {
                    final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
                        final int partition = i;
                        template.submit(() -> {
                            // 批量发送
                            sendMessage(topicName, partition, flatMessagePart);
                        });
                    }
                }

                // 批量等所有分区的结果
                template.waitForResult();
            } else {
                // 默认分区
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                sendMessage(topicName, partition, flatMessages);
            }
        }
    }