in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/plugin/builtin/sink/protocol/IoTDBConnector.java [308:374]
public void customize(
final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration)
throws Exception {
nodeUrls.clear();
nodeUrls.addAll(parseNodeUrls(parameters));
LOGGER.info("IoTDBConnector nodeUrls: {}", nodeUrls);
isTabletBatchModeEnabled =
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, SINK_IOTDB_BATCH_MODE_ENABLE_KEY),
CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE)
|| parameters
.getStringOrDefault(
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
CONNECTOR_FORMAT_HYBRID_VALUE)
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);
shouldMarkAsPipeRequest =
parameters.getBooleanOrDefault(
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest);
receiverStatusHandler =
new PipeReceiverStatusHandler(
parameters
.getStringOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY,
SINK_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_KEY),
CONNECTOR_EXCEPTION_CONFLICT_RESOLVE_STRATEGY_DEFAULT_VALUE)
.trim()
.equalsIgnoreCase("retry"),
parameters.getLongOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY,
SINK_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_KEY),
CONNECTOR_EXCEPTION_CONFLICT_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE),
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY,
SINK_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_KEY),
CONNECTOR_EXCEPTION_CONFLICT_RECORD_IGNORED_DATA_DEFAULT_VALUE),
parameters.getLongOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY,
SINK_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_KEY),
CONNECTOR_EXCEPTION_OTHERS_RETRY_MAX_TIME_SECONDS_DEFAULT_VALUE),
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY,
SINK_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_KEY),
CONNECTOR_EXCEPTION_OTHERS_RECORD_IGNORED_DATA_DEFAULT_VALUE));
shouldReceiverConvertOnTypeMismatch =
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
SINK_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY),
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_DEFAULT_VALUE);
LOGGER.info(
"IoTDBConnector {} = {}",
CONNECTOR_EXCEPTION_DATA_CONVERT_ON_TYPE_MISMATCH_KEY,
shouldReceiverConvertOnTypeMismatch);
}