in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java [171:183]
public void commitOffsets(boolean retryOnFailure) {
FutureUtil.waitForAll(commitOffsetsAsync()).handle((res, ex) -> {
if (ex != null) {
if (log.isDebugEnabled()) {
log.debug("Failed to commit offset {}, retrying {}", ex.getMessage(), retryOnFailure);
}
if (retryOnFailure) {
this.executor.schedule(() -> commitOffsets(retryOnFailure), 30, TimeUnit.SECONDS);
}
}
return null;
});
}