in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [160:208]
private void fillRecords(List<SourceRecord> records, String topic) throws InterruptedException {
Long bufferSize = config.getTaskBufferSize();
Long batchSize = config.getTaskBatchSize();
long maxWaitTime = System.currentTimeMillis() + config.getTaskTimeout();
int count = 0;
while (bufferSize > 0 && count < batchSize && System.currentTimeMillis() < maxWaitTime) {
JsonNode node = this.queue.poll(config.getTaskPollInterval(), TimeUnit.MILLISECONDS);
if (node == null) {
continue;
}
try {
// Set the Kafka message key if option is enabled and field is configured in document
String messageKey = "";
if (config.isMessageKeyEnabled()) {
JsonNode messageKeyFieldNode = node.get(config.getMessageKeyField());
messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
}
// Get the latest token and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);
// Since Lease container takes care of maintaining state we don't have to send source offset to kafka
SourceRecord sourceRecord = new SourceRecord(partitionMap, sourceOffset, topic,
Schema.STRING_SCHEMA, messageKey,
schemaAndValue.schema(), schemaAndValue.value());
bufferSize -= sourceRecord.value().toString().getBytes().length;
// If the buffer Size exceeds then do not remove the node .
if (bufferSize <= 0) {
this.queue.add(node);
break;
}
records.add(sourceRecord);
count++;
} catch (Exception e) {
logger.error("Failed to fill Source Records for Topic {}", topic);
throw e;
}
}
}