public void validate()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java [146:305]


  public void validate(final PipeParameterValidator validator) throws Exception {
    final PipeParameters parameters = validator.getParameters();

    validator.validate(
        args ->
            (boolean) args[0]
                || (((boolean) args[1] || (boolean) args[2]) && (boolean) args[3])
                || (boolean) args[4]
                || (((boolean) args[5] || (boolean) args[6]) && (boolean) args[7]),
        String.format(
            "One of %s, %s:%s, %s, %s:%s must be specified",
            CONNECTOR_IOTDB_NODE_URLS_KEY,
            CONNECTOR_IOTDB_HOST_KEY,
            CONNECTOR_IOTDB_PORT_KEY,
            SINK_IOTDB_NODE_URLS_KEY,
            SINK_IOTDB_HOST_KEY,
            SINK_IOTDB_PORT_KEY),
        parameters.hasAttribute(CONNECTOR_IOTDB_NODE_URLS_KEY),
        parameters.hasAttribute(CONNECTOR_IOTDB_IP_KEY),
        parameters.hasAttribute(CONNECTOR_IOTDB_HOST_KEY),
        parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
        parameters.hasAttribute(SINK_IOTDB_NODE_URLS_KEY),
        parameters.hasAttribute(SINK_IOTDB_IP_KEY),
        parameters.hasAttribute(SINK_IOTDB_HOST_KEY),
        parameters.hasAttribute(SINK_IOTDB_PORT_KEY));

    validator.validate(
        requestMaxBatchSizeInBytes -> (long) requestMaxBatchSizeInBytes > 0,
        String.format(
            "%s must be > 0, but got %s",
            SINK_IOTDB_BATCH_SIZE_KEY,
            parameters.getLongOrDefault(
                Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
                CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE)),
        parameters.getLongOrDefault(
            Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
            CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE));

    // Check coexistence of user and username
    validator.validateSynonymAttributes(
        Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
        Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
        false);

    username =
        parameters.getStringOrDefault(
            Arrays.asList(
                CONNECTOR_IOTDB_USER_KEY,
                SINK_IOTDB_USER_KEY,
                CONNECTOR_IOTDB_USERNAME_KEY,
                SINK_IOTDB_USERNAME_KEY),
            CONNECTOR_IOTDB_USER_DEFAULT_VALUE);
    password =
        parameters.getStringOrDefault(
            Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY),
            CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);

    loadBalanceStrategy =
        parameters
            .getStringOrDefault(
                Arrays.asList(CONNECTOR_LOAD_BALANCE_STRATEGY_KEY, SINK_LOAD_BALANCE_STRATEGY_KEY),
                CONNECTOR_LOAD_BALANCE_ROUND_ROBIN_STRATEGY)
            .trim()
            .toLowerCase();
    validator.validate(
        arg -> CONNECTOR_LOAD_BALANCE_STRATEGY_SET.contains(loadBalanceStrategy),
        String.format(
            "Load balance strategy should be one of %s, but got %s.",
            CONNECTOR_LOAD_BALANCE_STRATEGY_SET, loadBalanceStrategy),
        loadBalanceStrategy);

    loadTsFileStrategy =
        parameters
            .getStringOrDefault(
                Arrays.asList(CONNECTOR_LOAD_TSFILE_STRATEGY_KEY, SINK_LOAD_TSFILE_STRATEGY_KEY),
                CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE)
            .trim()
            .toLowerCase();
    validator.validate(
        arg -> CONNECTOR_LOAD_TSFILE_STRATEGY_SET.contains(loadTsFileStrategy),
        String.format(
            "Load tsfile strategy should be one of %s, but got %s.",
            CONNECTOR_LOAD_TSFILE_STRATEGY_SET, loadTsFileStrategy),
        loadTsFileStrategy);
    loadTsFileValidation =
        parameters.getBooleanOrDefault(
            Arrays.asList(CONNECTOR_LOAD_TSFILE_VALIDATION_KEY, SINK_LOAD_TSFILE_VALIDATION_KEY),
            CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE);

    final int zstdCompressionLevel =
        parameters.getIntOrDefault(
            Arrays.asList(CONNECTOR_COMPRESSOR_ZSTD_LEVEL_KEY, SINK_COMPRESSOR_ZSTD_LEVEL_KEY),
            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_DEFAULT_VALUE);
    validator.validate(
        arg ->
            (int) arg >= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE
                && (int) arg <= CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
        String.format(
            "Zstd compression level should be in the range [%d, %d], but got %d.",
            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MIN_VALUE,
            CONNECTOR_COMPRESSOR_ZSTD_LEVEL_MAX_VALUE,
            zstdCompressionLevel),
        zstdCompressionLevel);

    final String compressionTypes =
        parameters
            .getStringOrDefault(
                Arrays.asList(CONNECTOR_COMPRESSOR_KEY, SINK_COMPRESSOR_KEY),
                CONNECTOR_COMPRESSOR_DEFAULT_VALUE)
            .toLowerCase();
    if (!compressionTypes.isEmpty()) {
      for (final String compressionType : compressionTypes.split(",")) {
        final String trimmedCompressionType = compressionType.trim();
        if (trimmedCompressionType.isEmpty()) {
          continue;
        }

        validator.validate(
            arg -> CONNECTOR_COMPRESSOR_SET.contains(trimmedCompressionType),
            String.format(
                "Compressor should be one of %s, but got %s.",
                CONNECTOR_COMPRESSOR_SET, trimmedCompressionType),
            trimmedCompressionType);
        compressors.add(
            PipeCompressorFactory.getCompressor(
                new PipeCompressorConfig(trimmedCompressionType, zstdCompressionLevel)));
      }
    }
    validator.validate(
        arg -> compressors.size() <= Byte.MAX_VALUE,
        String.format(
            "The number of compressors should be less than or equal to %d, but got %d.",
            Byte.MAX_VALUE, compressors.size()),
        compressors.size());
    isRpcCompressionEnabled = !compressors.isEmpty();

    validator.validate(
        arg -> arg.equals("retry") || arg.equals("ignore"),
        String.format(
            "The value of key %s or %s must be either 'retry' or 'ignore'.",
            CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
            SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
        parameters
            .getStringOrDefault(
                Arrays.asList(
                    CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
                    SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
                CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
            .trim()
            .toLowerCase());

    validator.validateAttributeValueRange(
        validator.getParameters().hasAttribute(CONNECTOR_FORMAT_KEY)
            ? CONNECTOR_FORMAT_KEY
            : SINK_FORMAT_KEY,
        true,
        CONNECTOR_FORMAT_TABLET_VALUE,
        CONNECTOR_FORMAT_HYBRID_VALUE,
        CONNECTOR_FORMAT_TS_FILE_VALUE);
  }