in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java [186:376]
public void customize(
final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
throws Exception {
final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment();
pipeName = environment.getPipeName();
dataBaseName =
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(environment.getRegionId()))
.getDatabaseName();
if (dataBaseName != null) {
isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
}
pipeName2referenceCountMap.compute(
pipeName, (name, count) -> Objects.nonNull(count) ? count + 1 : 1);
pipeName2timeSeries2TimeSeriesRuntimeStateMap.putIfAbsent(pipeName, new ConcurrentHashMap<>());
databaseWithPathSeparator =
StorageEngine.getInstance()
.getDataRegion(
new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
.getDatabaseName()
+ TsFileConstant.PATH_SEPARATOR;
pipeTaskMeta =
((PipeTaskProcessorRuntimeEnvironment) configuration.getRuntimeEnvironment())
.getPipeTaskMeta();
// Load parameters
final long outputMaxDelaySeconds =
parameters.getLongOrDefault(
PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_KEY,
PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_DEFAULT_VALUE);
// The output max delay milliseconds must be set to at least 1
// to guarantee the correctness of the CAS in last receive time
outputMaxDelayMilliseconds =
outputMaxDelaySeconds < 0 ? Long.MAX_VALUE : Math.max(outputMaxDelaySeconds * 1000, 1);
outputMinReportIntervalMilliseconds =
parameters.getLongOrDefault(
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY,
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE)
* 1000;
final String outputDatabase =
parameters.getStringOrDefault(
PROCESSOR_OUTPUT_DATABASE_KEY, PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE);
outputDatabaseWithPathSeparator =
outputDatabase.isEmpty() ? outputDatabase : outputDatabase + TsFileConstant.PATH_SEPARATOR;
// Set output name
final List<String> operatorNameList =
Arrays.stream(
parameters
.getStringOrDefault(PROCESSOR_OPERATORS_KEY, PROCESSOR_OPERATORS_DEFAULT_VALUE)
.replace(" ", "")
.split(","))
.collect(Collectors.toList());
final String outputMeasurementString =
parameters.getStringOrDefault(
PROCESSOR_OUTPUT_MEASUREMENTS_KEY, PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE);
final List<String> outputMeasurementNameList =
outputMeasurementString.isEmpty()
? Collections.emptyList()
: Arrays.stream(outputMeasurementString.replace(" ", "").split(","))
.collect(Collectors.toList());
final Map<String, String> aggregatorName2OutputNameMap = new HashMap<>();
for (int i = 0; i < operatorNameList.size(); ++i) {
if (i < outputMeasurementNameList.size()) {
aggregatorName2OutputNameMap.put(
operatorNameList.get(i).toLowerCase(), outputMeasurementNameList.get(i));
} else {
aggregatorName2OutputNameMap.put(
operatorNameList.get(i).toLowerCase(), operatorNameList.get(i));
}
}
// Load the useful aggregators' and their corresponding intermediate results' computational
// logic.
final Set<String> declaredIntermediateResultSet = new HashSet<>();
final PipeDataRegionPluginAgent agent = PipeDataNodeAgent.plugin().dataRegion();
for (final String pipePluginName :
agent.getSubProcessorNamesWithSpecifiedParent(AbstractOperatorProcessor.class)) {
// Children are allowed to validate and configure the computational logic
// from the same parameters other than processor name
final AbstractOperatorProcessor operatorProcessor =
(AbstractOperatorProcessor)
agent.getConfiguredProcessor(pipePluginName, parameters, configuration);
operatorProcessor.getAggregatorOperatorSet().stream()
.filter(
operator ->
aggregatorName2OutputNameMap.containsKey(operator.getName().toLowerCase()))
.forEach(
operator -> {
outputName2OperatorMap.put(
aggregatorName2OutputNameMap.get(operator.getName().toLowerCase()), operator);
declaredIntermediateResultSet.addAll(operator.getDeclaredIntermediateValueNames());
});
operatorProcessor
.getIntermediateResultOperatorSupplierSet()
.forEach(
supplier ->
intermediateResultName2OperatorSupplierMap.put(
supplier.get().getName(), supplier));
operatorProcessors.add(operatorProcessor);
}
aggregatorName2OutputNameMap
.entrySet()
.removeIf(entry -> outputName2OperatorMap.containsKey(entry.getValue()));
if (!aggregatorName2OutputNameMap.isEmpty()) {
throw new PipeException(
String.format(
"The aggregator and output name %s is invalid.", aggregatorName2OutputNameMap));
}
intermediateResultName2OperatorSupplierMap.keySet().retainAll(declaredIntermediateResultSet);
declaredIntermediateResultSet.removeAll(intermediateResultName2OperatorSupplierMap.keySet());
if (!declaredIntermediateResultSet.isEmpty()) {
throw new PipeException(
String.format(
"The needed intermediate values %s are not defined.", declaredIntermediateResultSet));
}
// Set up column name strings
columnNameStringList = new String[outputName2OperatorMap.size()];
final List<String> operatorNames = new ArrayList<>(outputName2OperatorMap.keySet());
for (int i = 0; i < outputName2OperatorMap.size(); ++i) {
columnNameStringList[i] = operatorNames.get(i);
}
// Get windowing processor
final String processorName =
parameters.getStringOrDefault(
PROCESSOR_WINDOWING_STRATEGY_KEY, PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE)
+ WINDOWING_PROCESSOR_SUFFIX;
final PipeProcessor windowProcessor =
agent.getConfiguredProcessor(processorName, parameters, configuration);
if (!(windowProcessor instanceof AbstractWindowingProcessor)) {
throw new PipeException(
String.format("The processor %s is not a windowing processor.", processorName));
}
windowingProcessor = (AbstractWindowingProcessor) windowProcessor;
// Configure system parameters
systemParameters.put(
UDFParametersFactory.TIMESTAMP_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());
// The aggregated result operators can be configured here because they are global
// and stateless, needing only one configuration
this.outputName2OperatorMap
.values()
.forEach(operator -> operator.configureSystemParameters(systemParameters));
// Restore window state
final ProgressIndex index = pipeTaskMeta.getProgressIndex();
if (index == MinimumProgressIndex.INSTANCE) {
return;
}
if (!(index instanceof TimeWindowStateProgressIndex)) {
throw new PipeException(
String.format(
"The aggregate processor does not support progressIndexType %s", index.getType()));
}
final TimeWindowStateProgressIndex timeWindowStateProgressIndex =
(TimeWindowStateProgressIndex) index;
for (final Map.Entry<String, Pair<Long, ByteBuffer>> entry :
timeWindowStateProgressIndex.getTimeSeries2TimestampWindowBufferPairMap().entrySet()) {
final AtomicReference<TimeSeriesRuntimeState> stateReference =
pipeName2timeSeries2TimeSeriesRuntimeStateMap
.get(pipeName)
.computeIfAbsent(
entry.getKey(),
key ->
new AtomicReference<>(
new TimeSeriesRuntimeState(
outputName2OperatorMap,
intermediateResultName2OperatorSupplierMap,
systemParameters,
windowingProcessor)));
synchronized (stateReference) {
try {
stateReference.get().restoreTimestampAndWindows(entry.getValue());
} catch (final IOException e) {
throw new PipeException("Encountered exception when deserializing from PipeTaskMeta", e);
}
}
}
}