in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [228:264]
public void ack() {
// TODO: should flush for each batch. not wait for a time for acked all.
// How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked.
boolean canFlush = (outstandingRecords.decrementAndGet() == 0);
// consumed all the records, flush the offsets
if (canFlush && flushFuture != null) {
if (!offsetWriter.beginFlush()) {
log.error("When beginFlush, No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
return;
}
Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset);
if (doFlush == null) {
// Offsets added in processSourceRecord, But here no offsets to commit
log.error("No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
return;
}
// Wait until the offsets are flushed
try {
doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceTask.commit();
} catch (InterruptedException e) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
offsetWriter.cancelFlush();
} catch (ExecutionException e) {
log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
offsetWriter.cancelFlush();
} catch (TimeoutException e) {
log.error("Timed out waiting to flush {} offsets to storage", this);
offsetWriter.cancelFlush();
}
}
}