static Config validate()

in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [126:216]


  static Config validate(Map<String, String> props, boolean accessBigtableToValidateConfiguration) {
    // We create it without validation to use the same getters as the config users.
    Map<String, ConfigValue> validationResult = getDefinition().validateAll(props);

    if (validationResult.values().stream().allMatch(v -> v.errorMessages().isEmpty())) {
      BigtableSinkConfig config = new BigtableSinkConfig(props);

      // Note that we only need to verify the properties we define, the generic Sink configuration
      // is handled in SinkConnectorConfig::validate().
      String credentialsPath = config.getString(GCP_CREDENTIALS_PATH_CONFIG);
      String credentialsJson = config.getString(GCP_CREDENTIALS_JSON_CONFIG);
      String insertMode = config.getString(INSERT_MODE_CONFIG);
      String nullValueMode = config.getString(VALUE_NULL_MODE_CONFIG);
      Integer maxBatchSize = config.getInt(MAX_BATCH_SIZE_CONFIG);
      Boolean autoCreateTables = config.getBoolean(AUTO_CREATE_TABLES_CONFIG);
      Boolean autoCreateColumnFamilies = config.getBoolean(AUTO_CREATE_COLUMN_FAMILIES_CONFIG);

      if (!Utils.isBlank(credentialsPath) && !Utils.isBlank(credentialsJson)) {
        String errorMessage =
            GCP_CREDENTIALS_JSON_CONFIG
                + " and "
                + GCP_CREDENTIALS_PATH_CONFIG
                + " are mutually exclusive options, but both are set.";
        addErrorMessage(
            validationResult, GCP_CREDENTIALS_JSON_CONFIG, credentialsJson, errorMessage);
        addErrorMessage(
            validationResult, GCP_CREDENTIALS_PATH_CONFIG, credentialsPath, errorMessage);
      }
      if (InsertMode.INSERT.name().equals(insertMode) && !Integer.valueOf(1).equals(maxBatchSize)) {
        String errorMessage =
            "When using `"
                + INSERT_MODE_CONFIG
                + "` of `"
                + InsertMode.INSERT.name()
                + "`, "
                + MAX_BATCH_SIZE_CONFIG
                + " must be set to `1`.";
        addErrorMessage(validationResult, INSERT_MODE_CONFIG, insertMode, errorMessage);
        addErrorMessage(
            validationResult, MAX_BATCH_SIZE_CONFIG, String.valueOf(maxBatchSize), errorMessage);
      }
      if (InsertMode.INSERT.name().equals(insertMode)
          && NullValueMode.DELETE.name().equals(nullValueMode)) {
        String errorMessage =
            "When using `"
                + VALUE_NULL_MODE_CONFIG
                + "` of `"
                + NullValueMode.DELETE.name()
                + "`, "
                + INSERT_MODE_CONFIG
                + " must not be set to `"
                + InsertMode.INSERT.name()
                + "`.";
        addErrorMessage(validationResult, INSERT_MODE_CONFIG, insertMode, errorMessage);
        addErrorMessage(validationResult, VALUE_NULL_MODE_CONFIG, nullValueMode, errorMessage);
      }
      if (Boolean.TRUE.equals(autoCreateTables) && Boolean.FALSE.equals(autoCreateColumnFamilies)) {
        String errorMessage =
            "If you enable `"
                + AUTO_CREATE_TABLES_CONFIG
                + "`, you must also enable `"
                + AUTO_CREATE_COLUMN_FAMILIES_CONFIG
                + "`.";
        addErrorMessage(
            validationResult,
            AUTO_CREATE_TABLES_CONFIG,
            String.valueOf(autoCreateTables),
            errorMessage);
        addErrorMessage(
            validationResult,
            AUTO_CREATE_TABLES_CONFIG,
            String.valueOf(autoCreateColumnFamilies),
            errorMessage);
      }

      if (accessBigtableToValidateConfiguration) {
        // We validate the user's credentials in order to warn them early rather than fill DLQ
        // with records whose processing would fail due to invalid credentials.
        // We only call it after validating that all other parameters are fine since creating
        // a Cloud Bigtable client uses many of these parameters, and we don't want to warn
        // the user unnecessarily.
        if (!config.isBigtableConfigurationValid()) {
          String errorMessage = "Cloud Bigtable configuration is invalid.";
          for (String bigtableProp : BIGTABLE_CONFIGURATION_PROPERTIES) {
            addErrorMessage(validationResult, bigtableProp, props.get(bigtableProp), errorMessage);
          }
        }
      }
    }
    return new Config(new ArrayList<>(validationResult.values()));
  }