public void subscribeTopic()

in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap.java [97:120]


        public void subscribeTopic(TopicPartition topicPartition, Supplier<Checkpoint> streamCheckpoint, boolean isCheckpointNotExistThrowException) {
            consumer.subscribe(Arrays.asList(topicPartition.topic()), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    log.info("RecordFetcher consumer: partition revoked for [{}]", StringUtils.join(partitions, ","));
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    log.info("RecordFetcher consumer: partition assigned for [{}]", StringUtils.join(partitions, ","));

                    consumerContext.setTopicPartitions(partitions);
                    if (!consumerContext.hasValidTopicPartitions()) {
                        log.warn("In subscribe mode, recordFetcher consumer dose not assigned any partition, probably this client is a backup...");
                    }

                    if (partitions.contains(topicPartition)) {
                        Checkpoint toSet = streamCheckpoint.get();
                        setFetchOffsetByTimestamp(topicPartition, toSet, isCheckpointNotExistThrowException);
                        log.info("RecordFetcher consumer:  subscribe for [{}] with checkpoint [{}] start", topicPartition, toSet);
                    }
                }
            });
        }