in kafka-connect-bigtable-sink/sink/src/main/java/com/google/cloud/kafka/connect/bigtable/config/BigtableSinkConfig.java [126:216]
static Config validate(Map<String, String> props, boolean accessBigtableToValidateConfiguration) {
// We create it without validation to use the same getters as the config users.
Map<String, ConfigValue> validationResult = getDefinition().validateAll(props);
if (validationResult.values().stream().allMatch(v -> v.errorMessages().isEmpty())) {
BigtableSinkConfig config = new BigtableSinkConfig(props);
// Note that we only need to verify the properties we define, the generic Sink configuration
// is handled in SinkConnectorConfig::validate().
String credentialsPath = config.getString(GCP_CREDENTIALS_PATH_CONFIG);
String credentialsJson = config.getString(GCP_CREDENTIALS_JSON_CONFIG);
String insertMode = config.getString(INSERT_MODE_CONFIG);
String nullValueMode = config.getString(VALUE_NULL_MODE_CONFIG);
Integer maxBatchSize = config.getInt(MAX_BATCH_SIZE_CONFIG);
Boolean autoCreateTables = config.getBoolean(AUTO_CREATE_TABLES_CONFIG);
Boolean autoCreateColumnFamilies = config.getBoolean(AUTO_CREATE_COLUMN_FAMILIES_CONFIG);
if (!Utils.isBlank(credentialsPath) && !Utils.isBlank(credentialsJson)) {
String errorMessage =
GCP_CREDENTIALS_JSON_CONFIG
+ " and "
+ GCP_CREDENTIALS_PATH_CONFIG
+ " are mutually exclusive options, but both are set.";
addErrorMessage(
validationResult, GCP_CREDENTIALS_JSON_CONFIG, credentialsJson, errorMessage);
addErrorMessage(
validationResult, GCP_CREDENTIALS_PATH_CONFIG, credentialsPath, errorMessage);
}
if (InsertMode.INSERT.name().equals(insertMode) && !Integer.valueOf(1).equals(maxBatchSize)) {
String errorMessage =
"When using `"
+ INSERT_MODE_CONFIG
+ "` of `"
+ InsertMode.INSERT.name()
+ "`, "
+ MAX_BATCH_SIZE_CONFIG
+ " must be set to `1`.";
addErrorMessage(validationResult, INSERT_MODE_CONFIG, insertMode, errorMessage);
addErrorMessage(
validationResult, MAX_BATCH_SIZE_CONFIG, String.valueOf(maxBatchSize), errorMessage);
}
if (InsertMode.INSERT.name().equals(insertMode)
&& NullValueMode.DELETE.name().equals(nullValueMode)) {
String errorMessage =
"When using `"
+ VALUE_NULL_MODE_CONFIG
+ "` of `"
+ NullValueMode.DELETE.name()
+ "`, "
+ INSERT_MODE_CONFIG
+ " must not be set to `"
+ InsertMode.INSERT.name()
+ "`.";
addErrorMessage(validationResult, INSERT_MODE_CONFIG, insertMode, errorMessage);
addErrorMessage(validationResult, VALUE_NULL_MODE_CONFIG, nullValueMode, errorMessage);
}
if (Boolean.TRUE.equals(autoCreateTables) && Boolean.FALSE.equals(autoCreateColumnFamilies)) {
String errorMessage =
"If you enable `"
+ AUTO_CREATE_TABLES_CONFIG
+ "`, you must also enable `"
+ AUTO_CREATE_COLUMN_FAMILIES_CONFIG
+ "`.";
addErrorMessage(
validationResult,
AUTO_CREATE_TABLES_CONFIG,
String.valueOf(autoCreateTables),
errorMessage);
addErrorMessage(
validationResult,
AUTO_CREATE_TABLES_CONFIG,
String.valueOf(autoCreateColumnFamilies),
errorMessage);
}
if (accessBigtableToValidateConfiguration) {
// We validate the user's credentials in order to warn them early rather than fill DLQ
// with records whose processing would fail due to invalid credentials.
// We only call it after validating that all other parameters are fine since creating
// a Cloud Bigtable client uses many of these parameters, and we don't want to warn
// the user unnecessarily.
if (!config.isBigtableConfigurationValid()) {
String errorMessage = "Cloud Bigtable configuration is invalid.";
for (String bigtableProp : BIGTABLE_CONFIGURATION_PROPERTIES) {
addErrorMessage(validationResult, bigtableProp, props.get(bigtableProp), errorMessage);
}
}
}
}
return new Config(new ArrayList<>(validationResult.values()));
}