in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java [132:189]
public List<ConnectRecord> poll() throws InterruptedException {
if (lastCheckPointTimestamp + connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
log.info("sleep " + lastCheckPointTimestamp + ", " + connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
Thread.sleep(connectorConfig.getCheckpointIntervalMs() - System.currentTimeMillis() + lastCheckPointTimestamp + 100);
return null;
}
log.info("not sleep " + lastCheckPointTimestamp + ", " + connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
List<ConnectRecord> connectRecords = new LinkedList<>();
// pull consumer group's consumer commit offset
String syncGids = connectorConfig.getSyncGids();
if (StringUtils.isEmpty(syncGids)) {
lastCheckPointTimestamp = System.currentTimeMillis();
return null;
}
Set<String> srcTopics = ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), connectorConfig.getSrcTopicTags()).keySet();
try {
String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
for (String consumerGroup : syncGidArr) {
for (String srcTopic : srcTopics) {
String srcTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(srcTopic, connectorConfig.getSrcInstanceId());
String srcConsumerGroupWithInstanceId = ReplicatorUtils.buildConsumergroupWithNamespace(consumerGroup, connectorConfig.getSrcInstanceId());
try {
ConsumeStats srcConsumeStats = srcMqAdminExt.examineConsumeStats(srcConsumerGroupWithInstanceId, srcTopicWithInstanceId);
long minSrcLasttimestamp = getMinSrcLasttimestamp(srcConsumeStats);
String targetTopic = connectorConfig.getDestTopic();
String targetTopicWithInstanceId;
if (StringUtils.isEmpty(targetTopic) || StringUtils.isBlank(targetTopic)) {
targetTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(srcTopic, connectorConfig.getDestInstanceId());
} else {
targetTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(targetTopic, connectorConfig.getDestInstanceId());
}
ConsumeStats targetConsumeStats = targetMqAdminExt.examineConsumeStats(consumerGroup, targetTopicWithInstanceId);
long minDestLasttimestamp = getMinSrcLasttimestamp(targetConsumeStats);
RecordPartition recordPartition = ReplicatorUtils.convertToRecordPartition(srcTopic, consumerGroup);
RecordOffset recordOffset = ReplicatorUtils.convertToRecordOffset(0L);
ConnectRecord connectRecord = new ConnectRecord(recordPartition, recordOffset, System.currentTimeMillis());
Struct keyStruct = buildCheckpointKey(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId);
Struct valueStruct = buildCheckpointPayload(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId, minSrcLasttimestamp, minDestLasttimestamp);
connectRecord.setKeySchema(KEY_SCHEMA);
connectRecord.setKey(keyStruct);
connectRecord.setSchema(VALUE_SCHEMA_V0);
connectRecord.setData(valueStruct);
connectRecord.addExtension(TOPIC_KEY, connectorConfig.getCheckpointTopic());
connectRecords.add(connectRecord);
} catch (Exception e) {
log.error("examineConsumeStats gid : " + consumerGroup + ", topic : " + srcTopic + " error", e);
}
}
}
} catch (Exception e) {
log.error("get syncGids committed offset error, syncGids : " + syncGids, e);
}
//
lastCheckPointTimestamp = System.currentTimeMillis();
return connectRecords;
}