public List poll()

in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [139:157]


    public List<SourceRecord> poll() throws InterruptedException {
        List<SourceRecord> records = new ArrayList<>();
        
        long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();

        TopicContainerMap topicContainerMap = config.getTopicContainerMap();
        String topic = topicContainerMap.getTopicForContainer(config.getAssignedContainer()).orElseThrow(
            () -> new IllegalStateException("No topic defined for container " + config.getAssignedContainer() + "."));
        
        while (running.get()) {
            fillRecords(records, topic);            
            if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) {
                logger.debug("Sending {} documents.", records.size());
                break;
            }
        }
        
        return records;
    }