in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java [497:571]
private ConnectRecord convertToSinkDataEntry(MessageExt message) {
String topic = message.getTopic();
Map<String, String> properties = message.getProperties();
log.debug("srcProperties : " + properties);
Long timestamp;
ConnectRecord sinkDataEntry = null;
String connectTimestamp = properties.get(ReplicatorConnectorConfig.CONNECT_TIMESTAMP);
timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.parseLong(connectTimestamp) : System.currentTimeMillis();
Schema schema = SchemaBuilder.string().build();
byte[] body = message.getBody();
String destTopic = swapTopic(topic);
if (destTopic == null) {
if (!connectorConfig.getFailoverStrategy().equals(FailoverStrategy.DISMISS)) {
throw new RuntimeException("cannot find dest topic.");
} else {
log.error("swap topic got null, topic : " + topic);
}
}
RecordPartition recordPartition = ReplicatorUtils.convertToRecordPartition(topic, message.getBrokerName(), message.getQueueId());
RecordOffset recordOffset = ReplicatorUtils.convertToRecordOffset(message.getQueueOffset());
String bodyStr = new String(body, StandardCharsets.UTF_8);
sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
KeyValue keyValue = new DefaultKeyValue();
if (org.apache.commons.collections.MapUtils.isNotEmpty(properties)) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (MQ_SYS_KEYS.contains(entry.getKey())) {
keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
} else if (entry.getKey().startsWith("connect-ext-")) {
keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
} else {
keyValue.put(entry.getKey(), entry.getValue());
}
}
}
// check bornSource have destinationStr + ","
String bornSource = keyValue.getString(REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION);
// skip msg born from destination
if (bornSource != null && bornSource.contains(connectorConfig.generateDestinationString() + ",")) {
if (circleReplicateCounter.incrementAndGet() % 100 == 0) {
log.warn("skip " + circleReplicateCounter.get() + " message have replicated from " + connectorConfig.generateDestinationString() + ", bornSource : " + bornSource + ", message : " + message);
}
return null;
}
// save all source in born source, format is srcCloud "_" srcCluster "_" srcRegion ",";
if (StringUtils.isEmpty(bornSource)) {
bornSource = "";
}
keyValue.put(REPLICATOR_BORN_SOURCE_CLOUD_CLUSTER_REGION, bornSource + connectorConfig.generateSourceString() + ",");
String bornTopic = keyValue.getString(REPLICATOR_BORE_INSTANCEID_TOPIC);
// save born topic if empty
if (StringUtils.isEmpty(bornTopic)) {
// save full topic, format is srcInstanceId "%" srcTopicTags;
keyValue.put(REPLICATOR_BORE_INSTANCEID_TOPIC, connectorConfig.generateFullSourceTopicTags());
}
// put src born timestamp
keyValue.put(REPLICATOR_BORN_SOURCE_TIMESTAMP, message.getBornTimestamp());
// put src topic
keyValue.put(REPLICATOR_SRC_TOPIC_PROPERTY_KEY, topic);
// save tags
if (StringUtils.isNotBlank(message.getTags())) {
keyValue.put(MessageConst.PROPERTY_TAGS, message.getTags());
}
// save keys
if (StringUtils.isNotBlank(message.getKeys())) {
keyValue.put(MessageConst.PROPERTY_KEYS, message.getKeys());
}
// save src messageid
keyValue.put(REPLICATOR_SRC_MESSAGE_ID, message.getMsgId());
log.debug("addExtension : " + keyValue.keySet());
sinkDataEntry.addExtension(keyValue);
sinkDataEntry.addExtension(ReplicatorUtils.TOPIC, destTopic);
return sinkDataEntry;
}