in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java [146:305]
public void validate(final PipeParameterValidator validator) throws Exception {
final PipeParameters parameters = validator.getParameters();
validator.validate(
args ->
(boolean) args[0]
|| (((boolean) args[1] || (boolean) args[2]) && (boolean) args[3])
|| (boolean) args[4]
|| (((boolean) args[5] || (boolean) args[6]) && (boolean) args[7]),
String.format(
"One of %s, %s:%s, %s, %s:%s must be specified",
CONNECTOR_IOTDB_NODE_URLS_KEY,
CONNECTOR_IOTDB_HOST_KEY,
CONNECTOR_IOTDB_PORT_KEY,
SINK_IOTDB_NODE_URLS_KEY,
SINK_IOTDB_HOST_KEY,
SINK_IOTDB_PORT_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
parameters.hasAttribute(SINK_IOTDB_IP_KEY),
parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY));
validator.validate(
requestMaxBatchSizeInBytes -> (long) requestMaxBatchSizeInBytes > 0,
String.format(
"%s must be > 0, but got %s",
SINK_IOTDB_BATCH_SIZE_KEY,
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)),
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));
// Check coexistence of user and username
validator.validateSynonymAttributes(
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);
username =
parameters.getStringOrDefault(
Arrays.asList(
CONNECTOR_IOTDB_USER_KEY,
SINK_IOTDB_USER_KEY,
CONNECTOR_IOTDB_USERNAME_KEY,
SINK_IOTDB_USERNAME_KEY),
CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
password =
parameters.getStringOrDefault(
Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY),
CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
loadBalanceStrategy =
parameters
.getStringOrDefault(
Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, SINK_LOAD_BALANCE_STRATEGY_KEY),
CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
.trim()
.toLowerCase();
validator.validate(
arg -> CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
String.format(
"Load balance strategy should be one of %s, but got %s.",
CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
loadBalanceStrategy);
loadTsFileStrategy =
parameters
.getStringOrDefault(
Arrays.asList(CONNECTOR_LOAD_TSFILE_STRATEGY_KEY, SINK_LOAD_TSFILE_STRATEGY_KEY),
CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)
.trim()
.toLowerCase();
validator.validate(
arg -> CONNECTOR_LOAD_TSFILE_STRATEGY_SET.contains(loadTsFileStrategy),
String.format(
"Load tsfile strategy should be one of %s, but got %s.",
CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
loadTsFileStrategy);
loadTsFileValidation =
parameters.getBooleanOrDefault(
Arrays.asList(CONNECTOR_LOAD_TSFILE_VALIDATION_KEY, SINK_LOAD_TSFILE_VALIDATION_KEY),
CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE);
final int zstdCompressionLevel =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY, SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE);
validator.validate(
arg ->
(int) arg >= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE
&& (int) arg <= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
String.format(
"Zstd compression level should be in the range [%d, %d], but got %d.",
CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE,
CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
zstdCompressionLevel),
zstdCompressionLevel);
final String compressionTypes =
parameters
.getStringOrDefault(
Arrays.asList(CONNECTOR_COMPRESSOR_KEY, SINK_COMPRESSOR_KEY),
CONNECTOR_COMPRESSOR_DEFAULT_VALUE)
.toLowerCase();
if (!compressionTypes.isEmpty()) {
for (final String compressionType : compressionTypes.split(",")) {
final String trimmedCompressionType = compressionType.trim();
if (trimmedCompressionType.isEmpty()) {
continue;
}
validator.validate(
arg -> CONNECTOR_COMPRESSOR_SET.contains(trimmedCompressionType),
String.format(
"Compressor should be one of %s, but got %s.",
CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
trimmedCompressionType);
compressors.add(
PipeCompressorFactory.getCompressor(
new PipeCompressorConfig(trimmedCompressionType, zstdCompressionLevel)));
}
}
validator.validate(
arg -> compressors.size() <= Byte.MAX_VALUE,
String.format(
"The number of compressors should be less than or equal to %d, but got %d.",
Byte.MAX_VALUE, compressors.size()),
compressors.size());
isRpcCompressionEnabled = !compressors.isEmpty();
validator.validate(
arg -> arg.equals("retry") || arg.equals("ignore"),
String.format(
"The value of key %s or %s must be either 'retry' or 'ignore'.",
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
parameters
.getStringOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
.trim()
.toLowerCase());
validator.validateAttributeValueRange(
validator.getParameters().hasAttribute(CONNECTOR_FORMAT_KEY)
? CONNECTOR_FORMAT_KEY
: SINK_FORMAT_KEY,
true,
CONNECTOR_FORMAT_TABLET_VALUE,
CONNECTOR_FORMAT_HYBRID_VALUE,
CONNECTOR_FORMAT_TS_FILE_VALUE);
}