in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [139:157]
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new ArrayList<>();
long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();
TopicContainerMap topicContainerMap = config.getTopicContainerMap();
String topic = topicContainerMap.getTopicForContainer(config.getAssignedContainer()).orElseThrow(
() -> new IllegalStateException("No topic defined for container " + config.getAssignedContainer() + "."));
while (running.get()) {
fillRecords(records, topic);
if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) {
logger.debug("Sending {} documents.", records.size());
break;
}
}
return records;
}