in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [261:276]
protected void handleCosmosDbChanges(List<JsonNode> docs) {
for (JsonNode document : docs) {
// Blocks for each transfer till it is processed by the poll method.
// If we fail before checkpointing then the new worker starts again.
try {
logger.debug("Queuing document : {}", document);
this.queue.transfer(document);
} catch (InterruptedException e) {
logger.error("Interrupted! changeFeedReader.", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
}