in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [130:158]
public synchronized Record<T> read() throws Exception {
while (true) {
if (currentBatch == null) {
flushFuture = new CompletableFuture<>();
List<SourceRecord> recordList = sourceTask.poll();
if (recordList == null || recordList.isEmpty()) {
Thread.sleep(1000);
continue;
}
outstandingRecords.addAndGet(recordList.size());
currentBatch = recordList.iterator();
}
if (currentBatch.hasNext()) {
AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
if (processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
continue;
} else {
return processRecord;
}
} else {
// there is no records any more, then waiting for the batch to complete writing
// to sink and the offsets are committed as well, then do next round read.
flushFuture.get();
flushFuture = null;
currentBatch = null;
}
}
}