public void send()

in connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java [185:292]


    public void send(final MQDestination destination, String topicName,
                     com.alibaba.otter.canal.protocol.Message message) {
        // 获取当前topic的分区数
        Integer partitionNum = MQMessageUtils.parseDynamicTopicPartition(topicName,
            destination.getDynamicTopicPartitionNum());

        // 获取topic的队列数为分区数
        if (partitionNum == null) {
            partitionNum = getTopicDynamicQueuesSize(destination.getEnableDynamicQueuePartition(), topicName);
        }

        if (partitionNum == null) {
            partitionNum = destination.getPartitionsNum();
        }
        if (!mqProperties.isFlatMessage()) {
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                // 并发构造
                MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
                // 串行分区
                com.alibaba.otter.canal.protocol.Message[] messages = MQMessageUtils.messagePartition(datas,
                    message.getId(),
                    partitionNum,
                    destination.getPartitionHash(),
                    mqProperties.isDatabaseHash());
                int length = messages.length;

                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                for (int i = 0; i < length; i++) {
                    com.alibaba.otter.canal.protocol.Message dataPartition = messages[i];
                    if (dataPartition != null) {
                        final int index = i;
                        template.submit(() -> {
                            Message data = new Message(topicName,
                                ((RocketMQProducerConfig) this.mqProperties).getTag(),
                                CanalMessageSerializerUtil.serializer(dataPartition,
                                    mqProperties.isFilterTransactionEntry()));
                            sendMessage(data, index);
                        });
                    }
                }
                // 等所有分片发送完毕
                template.waitForResult();
            } else {
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                Message data = new Message(topicName,
                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
                    CanalMessageSerializerUtil.serializer(message, mqProperties.isFilterTransactionEntry()));
                sendMessage(data, partition);
            }
        } else {
            // 并发构造
            MQMessageUtils.EntryRowData[] datas = MQMessageUtils.buildMessageData(message, buildExecutor);
            // 串行分区
            List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
            // 初始化分区合并队列
            if (destination.getPartitionHash() != null && !destination.getPartitionHash().isEmpty()) {
                List<List<FlatMessage>> partitionFlatMessages = new ArrayList<>();
                for (int i = 0; i < partitionNum; i++) {
                    partitionFlatMessages.add(new ArrayList<>());
                }

                for (FlatMessage flatMessage : flatMessages) {
                    FlatMessage[] partitionFlatMessage = MQMessageUtils.messagePartition(flatMessage,
                        partitionNum,
                        destination.getPartitionHash(),
                        mqProperties.isDatabaseHash());
                    int length = partitionFlatMessage.length;
                    for (int i = 0; i < length; i++) {
                        // 增加null判断,issue #3267
                        if (partitionFlatMessage[i] != null) {
                            partitionFlatMessages.get(i).add(partitionFlatMessage[i]);
                        }
                    }
                }

                ExecutorTemplate template = new ExecutorTemplate(sendPartitionExecutor);
                for (int i = 0; i < partitionFlatMessages.size(); i++) {
                    final List<FlatMessage> flatMessagePart = partitionFlatMessages.get(i);
                    if (flatMessagePart != null && flatMessagePart.size() > 0) {
                        final int index = i;
                        template.submit(() -> {
                            List<Message> messages = flatMessagePart.stream()
                                .map(flatMessage -> new Message(topicName,
                                    ((RocketMQProducerConfig) this.mqProperties).getTag(),
                                    JSON.toJSONBytes(flatMessage,
                                        JSONWriter.Feature.WriteNulls,
                                        JSONWriter.Feature.LargeObject)))
                                .collect(Collectors.toList());
                            // 批量发送
                            sendMessage(messages, index);
                        });
                    }
                }

                // 批量等所有分区的结果
                template.waitForResult();
            } else {
                final int partition = destination.getPartition() != null ? destination.getPartition() : 0;
                List<Message> messages = flatMessages.stream()
                    .map(flatMessage -> new Message(topicName,
                        ((RocketMQProducerConfig) this.mqProperties).getTag(),
                        JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject)))
                    .collect(Collectors.toList());
                // 批量发送
                sendMessage(messages, partition);
            }
        }
    }