public synchronized Record read()

in pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java [130:158]


    public synchronized Record<T> read() throws Exception {
        while (true) {
            if (currentBatch == null) {
                flushFuture = new CompletableFuture<>();
                List<SourceRecord> recordList = sourceTask.poll();
                if (recordList == null || recordList.isEmpty()) {
                    Thread.sleep(1000);
                    continue;
                }
                outstandingRecords.addAndGet(recordList.size());
                currentBatch = recordList.iterator();
            }
            if (currentBatch.hasNext()) {
                AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
                if (processRecord.isEmpty()) {
                    outstandingRecords.decrementAndGet();
                    continue;
                } else {
                    return processRecord;
                }
            } else {
                // there is no records any more, then waiting for the batch to complete writing
                // to sink and the offsets are committed as well, then do next round read.
                flushFuture.get();
                flushFuture = null;
                currentBatch = null;
            }
        }
    }