in pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java [354:412]
public void validate() {
if (StringUtils.isEmpty(elasticSearchUrl)) {
throw new IllegalArgumentException("elasticSearchUrl not set.");
}
if (StringUtils.isNotEmpty(indexName) && createIndexIfNeeded) {
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#indices-create-api-path-params
// date format may contain upper cases, so we need to valid against parsed index name
IndexNameFormatter.validate(indexName);
if (indexName.startsWith("-") || indexName.startsWith("_") || indexName.startsWith("+")) {
throw new IllegalArgumentException("indexName start with an invalid character.");
}
if (indexName.equals(".") || indexName.equals("..")) {
throw new IllegalArgumentException("indexName cannot be . or ..");
}
if (indexName.getBytes(StandardCharsets.UTF_8).length > 255) {
throw new IllegalArgumentException("indexName cannot be longer than 255 bytes.");
}
}
if ((StringUtils.isNotEmpty(username) && StringUtils.isEmpty(password))
|| (StringUtils.isEmpty(username) && StringUtils.isNotEmpty(password))) {
throw new IllegalArgumentException("Values for both Username & password are required.");
}
boolean basicAuthSet = StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password);
boolean tokenAuthSet = StringUtils.isNotEmpty(token);
boolean apiKeySet = StringUtils.isNotEmpty(apiKey);
if ((basicAuthSet && tokenAuthSet && apiKeySet)
|| (basicAuthSet && tokenAuthSet)
|| (basicAuthSet && apiKeySet)
|| (tokenAuthSet && apiKeySet)) {
throw new IllegalArgumentException("Only one between basic/token/apiKey authentication mode must be configured.");
}
if (indexNumberOfShards <= 0) {
throw new IllegalArgumentException("indexNumberOfShards must be a strictly positive integer.");
}
if (indexNumberOfReplicas < 0) {
throw new IllegalArgumentException("indexNumberOfReplicas must be a positive integer.");
}
if (connectTimeoutInMs < 0) {
throw new IllegalArgumentException("connectTimeoutInMs must be a positive integer.");
}
if (connectionRequestTimeoutInMs < 0) {
throw new IllegalArgumentException("connectionRequestTimeoutInMs must be a positive integer.");
}
if (socketTimeoutInMs < 0) {
throw new IllegalArgumentException("socketTimeoutInMs must be a positive integer.");
}
if (bulkConcurrentRequests < 0) {
throw new IllegalArgumentException("bulkConcurrentRequests must be a positive integer.");
}
}