in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [211:230]
public void stop() {
logger.info("Stopping CosmosDB source task.");
while (!this.queue.isEmpty()) {
// Wait till the items are drained by poll before stopping.
try {
sleep(500);
} catch (InterruptedException e) {
logger.error("Interrupted! Failed to stop the task", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
running.set(false);
// Release all the resources.
if (changeFeedProcessor != null) {
changeFeedProcessor.stop();
changeFeedProcessor = null;
}
}