public void customize()

in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java [308:374]


  public void customize(
      final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration)
      throws Exception {
    nodeUrls.clear();
    nodeUrls.addAll(parseNodeUrls(parameters));
    LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);

    isTabletBatchModeEnabled =
        parameters.getBooleanOrDefault(
                Arrays.asList(
                    CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
                CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE)
            || parameters
                .getStringOrDefault(
                    Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
                    CONNECTOR_FORMAT_HYBRID_VALUE)
                .equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
    LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);

    shouldMarkAsPipeRequest =
        parameters.getBooleanOrDefault(
            Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
            CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
    LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest);

    receiverStatusHandler =
        new PipeReceiverStatusHandler(
            parameters
                .getStringOrDefault(
                    Arrays.asList(
                        CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
                        SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
                    CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
                .trim()
                .equalsIgnoreCase("retry"),
            parameters.getLongOrDefault(
                Arrays.asList(
                    CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY,
                    SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY),
                CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE),
            parameters.getBooleanOrDefault(
                Arrays.asList(
                    CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY,
                    SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY),
                CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE),
            parameters.getLongOrDefault(
                Arrays.asList(
                    CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY,
                    SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY),
                CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE),
            parameters.getBooleanOrDefault(
                Arrays.asList(
                    CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
                    SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
                CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));

    shouldReceiverConvertOnTypeMismatch =
        parameters.getBooleanOrDefault(
            Arrays.asList(
                CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
                SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
            CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
    LOGGER.info(
        "IoTDBConnector {} = {}",
        CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
        shouldReceiverConvertOnTypeMismatch);
  }