in src/main/java/com/aliyun/dts/subscribe/clients/recordfetcher/ConsumerWrap.java [97:120]
public void subscribeTopic(TopicPartition topicPartition, Supplier<Checkpoint> streamCheckpoint, boolean isCheckpointNotExistThrowException) {
consumer.subscribe(Arrays.asList(topicPartition.topic()), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("RecordFetcher consumer: partition revoked for [{}]", StringUtils.join(partitions, ","));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("RecordFetcher consumer: partition assigned for [{}]", StringUtils.join(partitions, ","));
consumerContext.setTopicPartitions(partitions);
if (!consumerContext.hasValidTopicPartitions()) {
log.warn("In subscribe mode, recordFetcher consumer dose not assigned any partition, probably this client is a backup...");
}
if (partitions.contains(topicPartition)) {
Checkpoint toSet = streamCheckpoint.get();
setFetchOffsetByTimestamp(topicPartition, toSet, isCheckpointNotExistThrowException);
log.info("RecordFetcher consumer: subscribe for [{}] with checkpoint [{}] start", topicPartition, toSet);
}
}
});
}