public void ack()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [228:264]


        public void ack() {
            // TODO: should flush for each batch. not wait for a time for acked all.
            // How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked.
            boolean canFlush = (outstandingRecords.decrementAndGet() == 0);

            // consumed all the records, flush the offsets
            if (canFlush && flushFuture != null) {
                if (!offsetWriter.beginFlush()) {
                    log.error("When beginFlush, No offsets to commit!");
                    flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
                    return;
                }

                Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset);
                if (doFlush == null) {
                    // Offsets added in processSourceRecord, But here no offsets to commit
                    log.error("No offsets to commit!");
                    flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
                    return;
                }

                // Wait until the offsets are flushed
                try {
                    doFlush.get(FLUSH_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                    sourceTask.commit();
                } catch (InterruptedException e) {
                    log.warn("Flush of {} offsets interrupted, cancelling", this);
                    offsetWriter.cancelFlush();
                } catch (ExecutionException e) {
                    log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
                    offsetWriter.cancelFlush();
                } catch (TimeoutException e) {
                    log.error("Timed out waiting to flush {} offsets to storage", this);
                    offsetWriter.cancelFlush();
                }
            }
        }