private CosmosAsyncContainer createNewLeaseContainer()

in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [278:310]


    private CosmosAsyncContainer createNewLeaseContainer(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
        CosmosAsyncDatabase database = client.getDatabase(databaseName);
        CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
        CosmosContainerResponse leaseContainerResponse = null;

        logger.info("Checking whether the lease container exists.");
        try {
            leaseContainerResponse = leaseCollection.read().block();
        } catch (CosmosException ex) {
            // Swallowing exceptions when the type is CosmosException and statusCode is 404
            if (ex.getStatusCode() != 404) {
                throw ex;
            }
            logger.info("Lease container does not exist {}", ex.getMessage());
        }

        if (leaseContainerResponse == null) {
            logger.info("Creating the Lease container : {}", leaseCollectionName);
            CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
            ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
            CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();

            try {
                database.createContainer(containerSettings, throughputProperties, requestOptions).block();
            } catch (Exception e) {
                logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
                throw e;
            }
            logger.info("Successfully created new lease container.");
        }

        return database.getContainer(leaseCollectionName);
    }