in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSinkFactoryBase.java [130:171]
void validateConfiguration(ElasticsearchConfiguration config) {
config.getHosts(); // validate hosts
validate(
config.getIndex().length() >= 1,
() -> String.format("'%s' must not be empty", INDEX_OPTION.key()));
int maxActions = config.getBulkFlushMaxActions();
validate(
maxActions == -1 || maxActions >= 1,
() ->
String.format(
"'%s' must be at least 1. Got: %s",
BULK_FLUSH_MAX_ACTIONS_OPTION.key(), maxActions));
long maxSize = config.getBulkFlushMaxByteSize().getBytes();
long mb1 = 1024 * 1024;
validate(
maxSize == -1 || (maxSize >= mb1 && maxSize % mb1 == 0),
() ->
String.format(
"'%s' must be in MB granularity. Got: %s",
BULK_FLUSH_MAX_SIZE_OPTION.key(),
config.getBulkFlushMaxByteSize().toHumanReadableString()));
validate(
config.getBulkFlushBackoffRetries().map(retries -> retries >= 1).orElse(true),
() ->
String.format(
"'%s' must be at least 1. Got: %s",
BULK_FLUSH_BACKOFF_MAX_RETRIES_OPTION.key(),
config.getBulkFlushBackoffRetries().get()));
if (config.getUsername().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())) {
validate(
config.getPassword().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get()),
() ->
String.format(
"'%s' and '%s' must be set at the same time. Got: username '%s' and password '%s'",
USERNAME_OPTION.key(),
PASSWORD_OPTION.key(),
config.getUsername().get(),
config.getPassword().orElse("")));
}
}