in src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java [304:326]
public CompletableFuture<Long> seekOffsetByTimestamp(
MessageQueue messageQueue, long timestamp) {
return CompletableFuture.supplyAsync(
() -> {
try {
long offset = adminExt.searchOffset(messageQueue, timestamp);
LOG.info(
"Consumer seek offset by timestamp from remote, mq={}, timestamp={}, offset={}",
UtilAll.getQueueDescription(messageQueue),
timestamp,
offset);
return offset;
} catch (MQClientException e) {
LOG.info(
"Consumer seek offset by timestamp from remote error, mq={}, timestamp={}",
UtilAll.getQueueDescription(messageQueue),
timestamp,
e);
throw new RuntimeException(e);
}
},
commonExecutorService);
}