public void start()

in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [52:106]


    public void start(Map<String, String> map) {
        logger.info("Starting CosmosDBSourceTask.");
        config = new CosmosDBSourceConfig(map);        
        this.queue = new LinkedTransferQueue<>();

        logger.info("Creating the client.");
        client = getCosmosClient();

        // Initialize the database, feed and lease containers
        CosmosAsyncDatabase database = client.getDatabase(config.getDatabaseName());
        String container = config.getAssignedContainer();
        CosmosAsyncContainer feedContainer = database.getContainer(container);
        leaseContainer = createNewLeaseContainer(client, config.getDatabaseName(), container + "-leases");

        // Create source partition map
        partitionMap = new HashMap<>();
        partitionMap.put("DatabaseName", config.getDatabaseName());
        partitionMap.put("Container", config.getAssignedContainer());
        
        Map<String, Object> offset = context.offsetStorageReader().offset(partitionMap);
        // If NOT using the latest offset, reset lease container token to earliest possible value
        if (!config.useLatestOffset()) {
            updateContinuationToken(ZERO_CONTINUATION_TOKEN);
        } else if (offset != null) {
            // Check for previous offset and compare with lease container token
            // If there's a mismatch, rewind lease container token to offset value
            String lastOffsetToken = (String) offset.get(OFFSET_KEY);
            String continuationToken = getContinuationToken();

            if (continuationToken != null && !lastOffsetToken.equals(continuationToken)) {
                logger.info("Mismatch in last offset {} and current continuation token {}.", 
                    lastOffsetToken, continuationToken);
                updateContinuationToken(lastOffsetToken);
            }
        }

        // Initiate Cosmos change feed processor
        changeFeedProcessor = getChangeFeedProcessor(config.getWorkerName(),feedContainer,leaseContainer);
        changeFeedProcessor.start()
                .subscribeOn(Schedulers.boundedElastic())
                .doOnSuccess(aVoid -> running.set(true))
                .subscribe();

        while (!running.get()) {
            try {
                sleep(500);
            } catch (InterruptedException e) {
                logger.warn("Interrupted!", e);                
                // Restore interrupted state...
                Thread.currentThread().interrupt();                
            }
        } // Wait for ChangeFeedProcessor to start.

        logger.info("Started CosmosDB source task.");
    }