in iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java [195:497]
public static void loadPipeInternalConfig(CommonConfig config, TrimProperties properties)
throws IOException {
config.setPipeNonForwardingEventsProgressReportInterval(
Integer.parseInt(
properties.getProperty(
"pipe_non_forwarding_events_progress_report_interval",
Integer.toString(config.getPipeNonForwardingEventsProgressReportInterval()))));
config.setPipeFileReceiverFsyncEnabled(
Boolean.parseBoolean(
properties.getProperty(
"pipe_file_receiver_fsync_enabled",
Boolean.toString(config.getPipeFileReceiverFsyncEnabled()))));
config.setPipeDataStructureTabletRowSize(
Integer.parseInt(
properties.getProperty(
"pipe_data_structure_tablet_row_size",
String.valueOf(config.getPipeDataStructureTabletRowSize()))));
config.setPipeDataStructureTabletSizeInBytes(
Integer.parseInt(
properties.getProperty(
"pipe_data_structure_tablet_size_in_bytes",
String.valueOf(config.getPipeDataStructureTabletSizeInBytes()))));
config.setPipeDataStructureTabletMemoryBlockAllocationRejectThreshold(
Double.parseDouble(
properties.getProperty(
"pipe_data_structure_tablet_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold()))));
config.setPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold(
Double.parseDouble(
properties.getProperty(
"pipe_data_structure_ts_file_memory_block_allocation_reject_threshold",
String.valueOf(
config.getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold()))));
config.setPipeTotalFloatingMemoryProportion(
Double.parseDouble(
properties.getProperty(
"pipe_total_floating_memory_proportion",
String.valueOf(config.getPipeTotalFloatingMemoryProportion()))));
config.setPipeRealTimeQueuePollTsFileThreshold(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_realtime_queue_poll_history_threshold"))
.orElse(
properties.getProperty(
"pipe_realtime_queue_poll_tsfile_threshold",
String.valueOf(config.getPipeRealTimeQueuePollTsFileThreshold())))));
config.setPipeRealTimeQueuePollHistoricalTsFileThreshold(
Integer.parseInt(
properties.getProperty(
"pipe_realtime_queue_poll_historical_tsfile_threshold",
String.valueOf(config.getPipeRealTimeQueuePollHistoricalTsFileThreshold()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
Integer.parseInt(
properties.getProperty(
"pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count",
String.valueOf(
config.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount()))));
config.setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
Long.parseLong(
properties.getProperty(
"pipe_subtask_executor_basic_check_point_interval_by_time_duration",
String.valueOf(
config.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration()))));
config.setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
Long.parseLong(
properties.getProperty(
"pipe_subtask_executor_pending_queue_max_blocking_time_ms",
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
config.setPipeSubtaskExecutorForcedRestartIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_subtask_executor_forced_restart_interval_ms",
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_source_assigner_disruptor_ring_buffer_size"))
.orElse(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_size",
String.valueOf(
config.getPipeExtractorAssignerDisruptorRingBufferSize())))));
config.setPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes( // 1MB
Integer.parseInt(
Optional.ofNullable(
properties.getProperty(
"pipe_source_assigner_disruptor_ring_buffer_entry_size_in_bytes"))
.orElse(
properties.getProperty(
"pipe_extractor_assigner_disruptor_ring_buffer_entry_size_in_bytes",
String.valueOf(
config
.getPipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes())))));
config.setPipeExtractorMatcherCacheSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
.orElse(
properties.getProperty(
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize())))));
config.setPipeConnectorHandshakeTimeoutMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))
.orElse(
properties.getProperty(
"pipe_connector_handshake_timeout_ms",
String.valueOf(config.getPipeConnectorHandshakeTimeoutMs())))));
config.setPipeConnectorReadFileBufferSize(
Integer.parseInt(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_size"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_size",
String.valueOf(config.getPipeConnectorReadFileBufferSize())))));
config.setIsPipeConnectorReadFileBufferMemoryControlEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_read_file_buffer_memory_control"))
.orElse(
properties.getProperty(
"pipe_connector_read_file_buffer_memory_control",
String.valueOf(
config.isPipeConnectorReadFileBufferMemoryControlEnabled())))));
config.setPipeConnectorRetryIntervalMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_retry_interval_ms"))
.orElse(
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
config.setPipeConnectorRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
.orElse(
properties.getProperty(
"pipe_connector_rpc_thrift_compression_enabled",
String.valueOf(config.isPipeConnectorRPCThriftCompressionEnabled())))));
config.setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
Long.parseLong(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_max_retry_execution_time_ms_per_call"))
.orElse(
properties.getProperty(
"pipe_async_connector_max_retry_execution_time_ms_per_call",
String.valueOf(
config.getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall())))));
config.setPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tsfile_event_queue_size"))
.orElse(
properties.getProperty(
"pipe_async_connector_forced_retry_tsfile_event_queue_size",
String.valueOf(
config
.getPipeAsyncConnectorForcedRetryTsFileEventQueueSizeThreshold())))));
config.setPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_tablet_event_queue_size"))
.orElse(
properties.getProperty(
"pipe_async_connector_forced_retry_tablet_event_queue_size",
String.valueOf(
config
.getPipeAsyncConnectorForcedRetryTabletEventQueueSizeThreshold())))));
config.setPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold(
Integer.parseInt(
Optional.ofNullable(
properties.getProperty("pipe_async_sink_forced_retry_total_event_queue_size"))
.orElse(
properties.getProperty(
"pipe_async_connector_forced_retry_total_event_queue_size",
String.valueOf(
config
.getPipeAsyncConnectorForcedRetryTotalEventQueueSizeThreshold())))));
config.setRateLimiterHotReloadCheckIntervalMs(
Integer.parseInt(
properties.getProperty(
"rate_limiter_hot_reload_check_interval_ms",
String.valueOf(config.getRateLimiterHotReloadCheckIntervalMs()))));
config.setPipeConnectorRequestSliceThresholdBytes(
Integer.parseInt(
properties.getProperty(
"pipe_connector_request_slice_threshold_bytes",
String.valueOf(config.getPipeConnectorRequestSliceThresholdBytes()))));
config.setPipeReceiverLoginPeriodicVerificationIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_receiver_login_periodic_verification_interval_ms",
Long.toString(config.getPipeReceiverLoginPeriodicVerificationIntervalMs()))));
config.setPipeReceiverActualToEstimatedMemoryRatio(
Double.parseDouble(
properties.getProperty(
"pipe_receiver_actual_to_estimated_memory_ratio",
Double.toString(config.getPipeReceiverActualToEstimatedMemoryRatio()))));
config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
properties.getProperty(
"pipe_max_allowed_historical_tsfile_per_data_region",
String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion()))));
config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
Integer.parseInt(
properties.getProperty(
"pipe_max_allowed_pending_tsfile_epoch_per_data_region",
String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion()))));
config.setPipeMaxAllowedPinnedMemTableCount(
Integer.parseInt(
properties.getProperty(
"pipe_max_allowed_pinned_memtable_count",
String.valueOf(config.getPipeMaxAllowedPinnedMemTableCount()))));
config.setPipeMaxAllowedLinkedTsFileCount(
Long.parseLong(
properties.getProperty(
"pipe_max_allowed_linked_tsfile_count",
String.valueOf(config.getPipeMaxAllowedLinkedTsFileCount()))));
config.setPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage(
Float.parseFloat(
properties.getProperty(
"pipe_max_allowed_linked_deleted_tsfile_disk_usage_percentage",
String.valueOf(config.getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage()))));
config.setPipeStuckRestartIntervalSeconds(
Long.parseLong(
properties.getProperty(
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
config.setPipeStuckRestartMinIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
Boolean.parseBoolean(
properties.getProperty(
"pipe_epoch_keep_tsfile_after_stuck_restart_enabled",
String.valueOf(config.isPipeEpochKeepTsFileAfterStuckRestartEnabled()))));
config.setPipeStorageEngineFlushTimeIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_storage_engine_flush_time_interval_ms",
String.valueOf(config.getPipeStorageEngineFlushTimeIntervalMs()))));
config.setPipeMemoryAllocateMaxRetries(
Integer.parseInt(
properties.getProperty(
"pipe_memory_allocate_max_retries",
String.valueOf(config.getPipeMemoryAllocateMaxRetries()))));
config.setPipeMemoryAllocateRetryIntervalInMs(
Long.parseLong(
properties.getProperty(
"pipe_memory_allocate_retry_interval_in_ms",
String.valueOf(config.getPipeMemoryAllocateRetryIntervalInMs()))));
config.setPipeMemoryAllocateMinSizeInBytes(
Long.parseLong(
properties.getProperty(
"pipe_memory_allocate_min_size_in_bytes",
String.valueOf(config.getPipeMemoryAllocateMinSizeInBytes()))));
config.setPipeMemoryAllocateForTsFileSequenceReaderInBytes(
Long.parseLong(
properties.getProperty(
"pipe_memory_allocate_for_tsfile_sequence_reader_in_bytes",
String.valueOf(config.getPipeMemoryAllocateForTsFileSequenceReaderInBytes()))));
config.setPipeMemoryExpanderIntervalSeconds(
Long.parseLong(
properties.getProperty(
"pipe_memory_expander_interval_seconds",
String.valueOf(config.getPipeMemoryExpanderIntervalSeconds()))));
config.setPipeCheckMemoryEnoughIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_check_memory_enough_interval_ms",
String.valueOf(config.getPipeCheckMemoryEnoughIntervalMs()))));
config.setPipeLeaderCacheMemoryUsagePercentage(
Float.parseFloat(
properties.getProperty(
"pipe_leader_cache_memory_usage_percentage",
String.valueOf(config.getPipeLeaderCacheMemoryUsagePercentage()))));
config.setPipeMaxAlignedSeriesNumInOneBatch(
Integer.parseInt(
properties.getProperty(
"pipe_max_aligned_series_num_in_one_batch",
String.valueOf(config.getPipeMaxAlignedSeriesNumInOneBatch()))));
config.setPipeRemainingTimeCommitRateAutoSwitchSeconds(
Long.parseLong(
properties.getProperty(
"pipe_remaining_time_commit_rate_auto_switch_seconds",
String.valueOf(config.getPipeRemainingTimeCommitRateAutoSwitchSeconds()))));
config.setPipeTsFileScanParsingThreshold(
Double.parseDouble(
properties.getProperty(
"pipe_tsfile_scan_parsing_threshold",
String.valueOf(config.getPipeTsFileScanParsingThreshold()))));
}