private void fillRecords()

in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [160:208]


    private void fillRecords(List<SourceRecord> records, String topic) throws InterruptedException {
        Long bufferSize = config.getTaskBufferSize();
        Long batchSize = config.getTaskBatchSize();
        long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();

        int count = 0;
        while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime) {
            JsonNode node = this.queue.poll(config.getTaskPollInterval(), TimeUnit.MILLISECONDS);
            
            if (node == null) { 
                continue; 
            }
            
            try {                
                // Set the Kafka message key if option is enabled and field is configured in document
                String messageKey = "";
                if (config.isMessageKeyEnabled()) {
                    JsonNode messageKeyFieldNode = node.get(config.getMessageKeyField());
                    messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
                }

                // Get the latest token and record as offset
                Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
                logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));

                // Convert JSON to Kafka Connect struct and JSON schema
                SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);

                // Since Lease container takes care of maintaining state we don't have to send source offset to kafka
                SourceRecord sourceRecord = new SourceRecord(partitionMap, sourceOffset, topic,
                                                    Schema.STRING_SCHEMA, messageKey,
                                                    schemaAndValue.schema(), schemaAndValue.value());

                bufferSize -= sourceRecord.value().toString().getBytes().length;

                // If the buffer Size exceeds then do not remove the node .
                if (bufferSize <= 0) {
                    this.queue.add(node);
                    break;
                }
                
                records.add(sourceRecord);
                count++;
            } catch (Exception e) {
                logger.error("Failed to fill Source Records for Topic {}", topic);
                throw e;
            }
        }
    }