in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [508:589]
public BigtableDataClient getBigtableDataClient() {
Duration totalTimeout = getTotalRetryTimeout();
RetrySettings retrySettings = getRetrySettings(totalTimeout, Duration.ZERO);
Optional<CredentialsProvider> credentialsProvider =
getUserConfiguredBigtableCredentialsProvider();
BigtableDataSettings.Builder dataSettingsBuilder =
BigtableDataSettings.newBuilder()
.setProjectId(getString(GCP_PROJECT_ID_CONFIG))
.setInstanceId(getString(BIGTABLE_INSTANCE_ID_CONFIG));
if (credentialsProvider.isPresent()) {
dataSettingsBuilder.setCredentialsProvider(credentialsProvider.get());
} else {
// Use the default credential provider that utilizes Application Default Credentials.
}
String appProfileId = getString(BIGTABLE_APP_PROFILE_ID_CONFIG);
if (appProfileId == null) {
dataSettingsBuilder.setDefaultAppProfileId();
} else {
dataSettingsBuilder.setAppProfileId(appProfileId);
}
EnhancedBigtableStubSettings.Builder dataStubSettings = dataSettingsBuilder.stubSettings();
dataStubSettings.setHeaderProvider(getHeaderProvider());
dataStubSettings.mutateRowSettings().setRetrySettings(retrySettings);
dataStubSettings.checkAndMutateRowSettings().setRetrySettings(retrySettings);
dataStubSettings.bulkMutateRowsSettings().setRetrySettings(retrySettings);
dataStubSettings.readRowSettings().setRetrySettings(retrySettings);
dataStubSettings.readRowsSettings().setRetrySettings(retrySettings);
dataStubSettings.bulkReadRowsSettings().setRetrySettings(retrySettings);
// After a schema modification, Bigtable API sometimes transiently returns NOT_FOUND and
// FAILED_PRECONDITION errors. We just need to retry them. We do it if - and only if - the
// column family auto creation is enabled, because then BigtableSchemaManager ensures that
// processing of records, for which the tables and/or column families are missing, is
// finished early, before the actual data requests are sent.
if (getBoolean(AUTO_CREATE_COLUMN_FAMILIES_CONFIG)) {
Set<StatusCode.Code> transientErrorsAfterSchemaModification =
Set.of(StatusCode.Code.NOT_FOUND, StatusCode.Code.FAILED_PRECONDITION);
dataStubSettings
.mutateRowSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.mutateRowSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
dataStubSettings
.checkAndMutateRowSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.checkAndMutateRowSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
dataStubSettings
.bulkMutateRowsSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.bulkMutateRowsSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
dataStubSettings
.readRowSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.readRowSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
dataStubSettings
.readRowsSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.readRowsSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
dataStubSettings
.bulkReadRowsSettings()
.setRetryableCodes(
Sets.union(
dataStubSettings.bulkReadRowsSettings().getRetryableCodes(),
transientErrorsAfterSchemaModification));
}
try {
return BigtableDataClient.create(dataSettingsBuilder.build());
} catch (IOException e) {
throw new RetriableException(e);
}
}