in src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java [278:310]
private CosmosAsyncContainer createNewLeaseContainer(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
CosmosAsyncDatabase database = client.getDatabase(databaseName);
CosmosAsyncContainer leaseCollection = database.getContainer(leaseCollectionName);
CosmosContainerResponse leaseContainerResponse = null;
logger.info("Checking whether the lease container exists.");
try {
leaseContainerResponse = leaseCollection.read().block();
} catch (CosmosException ex) {
// Swallowing exceptions when the type is CosmosException and statusCode is 404
if (ex.getStatusCode() != 404) {
throw ex;
}
logger.info("Lease container does not exist {}", ex.getMessage());
}
if (leaseContainerResponse == null) {
logger.info("Creating the Lease container : {}", leaseCollectionName);
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
ThroughputProperties throughputProperties = ThroughputProperties.createManualThroughput(400);
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();
try {
database.createContainer(containerSettings, throughputProperties, requestOptions).block();
} catch (Exception e) {
logger.error("Failed to create container {} in database {}", leaseCollectionName, databaseName);
throw e;
}
logger.info("Successfully created new lease container.");
}
return database.getContainer(leaseCollectionName);
}