private ConnectRecord convertToSinkDataEntry()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [497:571]


    private ConnectRecord convertToSinkDataEntry(MessageExt message) {
        String topic = message.getTopic();
        Map<String, String> properties = message.getProperties();
        log.debug("srcProperties : " + properties);
        Long timestamp;
        ConnectRecord sinkDataEntry = null;

        String connectTimestamp = properties.get(ReplicatorConnectorConfig.CONNECT_TIMESTAMP);
        timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.parseLong(connectTimestamp) : System.currentTimeMillis();
        Schema schema = SchemaBuilder.string().build();
        byte[] body = message.getBody();
        String destTopic = swapTopic(topic);
        if (destTopic == null) {
            if (!connectorConfig.getFailoverStrategy().equals(FailoverStrategy.DISMISS)) {
                throw new RuntimeException("cannot find dest topic.");
            } else {
                log.error("swap topic got null, topic : " + topic);
            }
        }
        RecordPartition recordPartition = ReplicatorUtils.convertToRecordPartition(topic, message.getBrokerName(), message.getQueueId());
        RecordOffset recordOffset = ReplicatorUtils.convertToRecordOffset(message.getQueueOffset());
        String bodyStr = new String(body, StandardCharsets.UTF_8);
        sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
        KeyValue keyValue = new DefaultKeyValue();
        if (org.apache.commons.collections.MapUtils.isNotEmpty(properties)) {
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                if (MQ_SYS_KEYS.contains(entry.getKey())) {
                    keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
                } else if (entry.getKey().startsWith("connect-ext-")) {
                    keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
                } else {
                    keyValue.put(entry.getKey(), entry.getValue());
                }
            }
        }
        // check bornSource have destinationStr + ","
        String bornSource = keyValue.getString(REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION);
        // skip msg born from destination
        if (bornSource != null && bornSource.contains(connectorConfig.generateDestinationString() + ",")) {
            if (circleReplicateCounter.incrementAndGet() % 100 == 0) {
                log.warn("skip " + circleReplicateCounter.get() + " message have replicated from " + connectorConfig.generateDestinationString() + ", bornSource : " + bornSource + ", message : " + message);
            }
            return null;
        }
        // save all source in born source, format is srcCloud "_" srcCluster "_" srcRegion ",";
        if (StringUtils.isEmpty(bornSource)) {
            bornSource = "";
        }
        keyValue.put(REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION, bornSource + connectorConfig.generateSourceString() + ",");
        String bornTopic = keyValue.getString(REPLICATOR_BORE_INSTANCEID_TOPIC);
        // save born topic if empty
        if (StringUtils.isEmpty(bornTopic)) {
            // save full topic, format is srcInstanceId "%" srcTopicTags;
            keyValue.put(REPLICATOR_BORE_INSTANCEID_TOPIC, connectorConfig.generateFullSourceTopicTags());
        }
        // put src born timestamp
        keyValue.put(REPLICATOR_BORN_SOURCE_TIMESTAMP, message.getBornTimestamp());
        // put src topic
        keyValue.put(REPLICATOR_SRC_TOPIC_PROPERTY_KEY, topic);
        // save tags
        if (StringUtils.isNotBlank(message.getTags())) {
            keyValue.put(MessageConst.PROPERTY_TAGS, message.getTags());
        }
        // save keys
        if (StringUtils.isNotBlank(message.getKeys())) {
            keyValue.put(MessageConst.PROPERTY_KEYS, message.getKeys());
        }
        // save src messageid
        keyValue.put(REPLICATOR_SRC_MESSAGE_ID, message.getMsgId());
        log.debug("addExtension : " + keyValue.keySet());
        sinkDataEntry.addExtension(keyValue);
        sinkDataEntry.addExtension(ReplicatorUtils.TOPIC, destTopic);

        return sinkDataEntry;
    }