in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [236:265]
public CompletableFuture<Long> seekCommittedOffset(MessageQueue messageQueue) {
return CompletableFuture.supplyAsync(
() -> {
try {
long offset =
consumer.getOffsetStore()
.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
if (offset == -1) {
offset = adminExt.minOffset(messageQueue);
LOG.info(
"Consumer seek committed offset from remote, offset=-1,mq={},use minOffset={}",
UtilAll.getQueueDescription(messageQueue),
offset);
}
LOG.info(
"Consumer seek committed offset from remote, mq={}, offset={}",
UtilAll.getQueueDescription(messageQueue),
offset);
return offset;
} catch (Exception e) {
LOG.error(
"Consumer seek committed offset from remote error, mq={}",
UtilAll.getQueueDescription(messageQueue),
e);
throw new RuntimeException(e);
}
},
commonExecutorService);
}