public BigtableDataClient getBigtableDataClient()

in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [508:589]


  public BigtableDataClient getBigtableDataClient() {
    Duration totalTimeout = getTotalRetryTimeout();
    RetrySettings retrySettings = getRetrySettings(totalTimeout, Duration.ZERO);
    Optional<CredentialsProvider> credentialsProvider =
        getUserConfiguredBigtableCredentialsProvider();

    BigtableDataSettings.Builder dataSettingsBuilder =
        BigtableDataSettings.newBuilder()
            .setProjectId(getString(GCP_PROJECT_ID_CONFIG))
            .setInstanceId(getString(BIGTABLE_INSTANCE_ID_CONFIG));
    if (credentialsProvider.isPresent()) {
      dataSettingsBuilder.setCredentialsProvider(credentialsProvider.get());
    } else {
      // Use the default credential provider that utilizes Application Default Credentials.
    }
    String appProfileId = getString(BIGTABLE_APP_PROFILE_ID_CONFIG);
    if (appProfileId == null) {
      dataSettingsBuilder.setDefaultAppProfileId();
    } else {
      dataSettingsBuilder.setAppProfileId(appProfileId);
    }

    EnhancedBigtableStubSettings.Builder dataStubSettings = dataSettingsBuilder.stubSettings();
    dataStubSettings.setHeaderProvider(getHeaderProvider());
    dataStubSettings.mutateRowSettings().setRetrySettings(retrySettings);
    dataStubSettings.checkAndMutateRowSettings().setRetrySettings(retrySettings);
    dataStubSettings.bulkMutateRowsSettings().setRetrySettings(retrySettings);
    dataStubSettings.readRowSettings().setRetrySettings(retrySettings);
    dataStubSettings.readRowsSettings().setRetrySettings(retrySettings);
    dataStubSettings.bulkReadRowsSettings().setRetrySettings(retrySettings);

    // After a schema modification, Bigtable API sometimes transiently returns NOT_FOUND and
    // FAILED_PRECONDITION errors. We just need to retry them. We do it if - and only if - the
    // column family auto creation is enabled, because then BigtableSchemaManager ensures that
    // processing of records, for which the tables and/or column families are missing, is
    // finished early, before the actual data requests are sent.
    if (getBoolean(AUTO_CREATE_COLUMN_FAMILIES_CONFIG)) {
      Set<StatusCode.Code> transientErrorsAfterSchemaModification =
          Set.of(StatusCode.Code.NOT_FOUND, StatusCode.Code.FAILED_PRECONDITION);
      dataStubSettings
          .mutateRowSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.mutateRowSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
      dataStubSettings
          .checkAndMutateRowSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.checkAndMutateRowSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
      dataStubSettings
          .bulkMutateRowsSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.bulkMutateRowsSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
      dataStubSettings
          .readRowSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.readRowSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
      dataStubSettings
          .readRowsSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.readRowsSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
      dataStubSettings
          .bulkReadRowsSettings()
          .setRetryableCodes(
              Sets.union(
                  dataStubSettings.bulkReadRowsSettings().getRetryableCodes(),
                  transientErrorsAfterSchemaModification));
    }
    try {
      return BigtableDataClient.create(dataSettingsBuilder.build());
    } catch (IOException e) {
      throw new RetriableException(e);
    }
  }