public List poll()

in connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java [132:189]


    public List<ConnectRecord> poll() throws InterruptedException {
        if (lastCheckPointTimestamp + connectorConfig.getCheckpointIntervalMs() > System.currentTimeMillis()) {
            log.info("sleep " + lastCheckPointTimestamp + ", " + connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
            Thread.sleep(connectorConfig.getCheckpointIntervalMs() - System.currentTimeMillis() + lastCheckPointTimestamp + 100);
            return null;
        }
        log.info("not sleep " + lastCheckPointTimestamp + ", " + connectorConfig.getCheckpointIntervalMs() + ", " + System.currentTimeMillis());
        List<ConnectRecord> connectRecords = new LinkedList<>();
        // pull consumer group's consumer commit offset
        String syncGids = connectorConfig.getSyncGids();
        if (StringUtils.isEmpty(syncGids)) {
            lastCheckPointTimestamp = System.currentTimeMillis();
            return null;
        }
        Set<String> srcTopics = ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), connectorConfig.getSrcTopicTags()).keySet();
        try {
            String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
            for (String consumerGroup : syncGidArr) {
                for (String srcTopic : srcTopics) {
                    String srcTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(srcTopic, connectorConfig.getSrcInstanceId());
                    String srcConsumerGroupWithInstanceId = ReplicatorUtils.buildConsumergroupWithNamespace(consumerGroup, connectorConfig.getSrcInstanceId());
                    try {
                        ConsumeStats srcConsumeStats = srcMqAdminExt.examineConsumeStats(srcConsumerGroupWithInstanceId, srcTopicWithInstanceId);
                        long minSrcLasttimestamp = getMinSrcLasttimestamp(srcConsumeStats);

                        String targetTopic = connectorConfig.getDestTopic();
                        String targetTopicWithInstanceId;
                        if (StringUtils.isEmpty(targetTopic) || StringUtils.isBlank(targetTopic)) {
                            targetTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(srcTopic, connectorConfig.getDestInstanceId());
                        } else {
                            targetTopicWithInstanceId = ReplicatorUtils.buildTopicWithNamespace(targetTopic, connectorConfig.getDestInstanceId());
                        }
                        ConsumeStats targetConsumeStats = targetMqAdminExt.examineConsumeStats(consumerGroup, targetTopicWithInstanceId);
                        long minDestLasttimestamp = getMinSrcLasttimestamp(targetConsumeStats);

                        RecordPartition recordPartition = ReplicatorUtils.convertToRecordPartition(srcTopic, consumerGroup);
                        RecordOffset recordOffset = ReplicatorUtils.convertToRecordOffset(0L);
                        ConnectRecord connectRecord = new ConnectRecord(recordPartition, recordOffset, System.currentTimeMillis());
                        Struct keyStruct = buildCheckpointKey(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId);
                        Struct valueStruct = buildCheckpointPayload(srcTopicWithInstanceId, srcConsumerGroupWithInstanceId, minSrcLasttimestamp, minDestLasttimestamp);
                        connectRecord.setKeySchema(KEY_SCHEMA);
                        connectRecord.setKey(keyStruct);
                        connectRecord.setSchema(VALUE_SCHEMA_V0);
                        connectRecord.setData(valueStruct);
                        connectRecord.addExtension(TOPIC_KEY, connectorConfig.getCheckpointTopic());
                        connectRecords.add(connectRecord);
                    } catch (Exception e) {
                        log.error("examineConsumeStats gid : " + consumerGroup + ", topic : " + srcTopic + " error", e);
                    }
                }
            }
        } catch (Exception e) {
            log.error("get syncGids committed offset error, syncGids : " + syncGids, e);
        }
        //
        lastCheckPointTimestamp = System.currentTimeMillis();
        return connectRecords;
    }