in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [52:106]
public void start(Map<String, String> map) {
logger.info("Starting CosmosDBSourceTask.");
config = new CosmosDBSourceConfig(map);
this.queue = new LinkedTransferQueue<>();
logger.info("Creating the client.");
client = getCosmosClient();
// Initialize the database, feed and lease containers
CosmosAsyncDatabase database = client.getDatabase(config.getDatabaseName());
String container = config.getAssignedContainer();
CosmosAsyncContainer feedContainer = database.getContainer(container);
leaseContainer = createNewLeaseContainer(client, config.getDatabaseName(), container + "-leases");
// Create source partition map
partitionMap = new HashMap<>();
partitionMap.put("DatabaseName", config.getDatabaseName());
partitionMap.put("Container", config.getAssignedContainer());
Map<String, Object> offset = context.offsetStorageReader().offset(partitionMap);
// If NOT using the latest offset, reset lease container token to earliest possible value
if (!config.useLatestOffset()) {
updateContinuationToken(ZERO_CONTINUATION_TOKEN);
} else if (offset != null) {
// Check for previous offset and compare with lease container token
// If there's a mismatch, rewind lease container token to offset value
String lastOffsetToken = (String) offset.get(OFFSET_KEY);
String continuationToken = getContinuationToken();
if (continuationToken != null && !lastOffsetToken.equals(continuationToken)) {
logger.info("Mismatch in last offset {} and current continuation token {}.",
lastOffsetToken, continuationToken);
updateContinuationToken(lastOffsetToken);
}
}
// Initiate Cosmos change feed processor
changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(),feedContainer,leaseContainer);
changeFeedProcessor.start()
.subscribeOn(Schedulers.boundedElastic())
.doOnSuccess(aVoid -> running.set(true))
.subscribe();
while (!running.get()) {
try {
sleep(500);
} catch (InterruptedException e) {
logger.warn("Interrupted!", e);
// Restore interrupted state...
Thread.currentThread().interrupt();
}
} // Wait for ChangeFeedProcessor to start.
logger.info("Started CosmosDB source task.");
}