public void customize()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java [186:376]


  public void customize(
      final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration)
      throws Exception {
    final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment();
    pipeName = environment.getPipeName();
    dataBaseName =
        StorageEngine.getInstance()
            .getDataRegion(new DataRegionId(environment.getRegionId()))
            .getDatabaseName();
    if (dataBaseName != null) {
      isTableModel = PathUtils.isTableModelDatabase(dataBaseName);
    }

    pipeName2referenceCountMap.compute(
        pipeName, (name, count) -> Objects.nonNull(count) ? count + 1 : 1);
    pipeName2timeSeries2TimeSeriesRuntimeStateMap.putIfAbsent(pipeName, new ConcurrentHashMap<>());

    databaseWithPathSeparator =
        StorageEngine.getInstance()
                .getDataRegion(
                    new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
                .getDatabaseName()
            + TsFileConstant.PATH_SEPARATOR;

    pipeTaskMeta =
        ((PipeTaskProcessorRuntimeEnvironment) configuration.getRuntimeEnvironment())
            .getPipeTaskMeta();
    // Load parameters
    final long outputMaxDelaySeconds =
        parameters.getLongOrDefault(
            PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_KEY,
            PROCESSOR_OUTPUT_MAX_DELAY_SECONDS_DEFAULT_VALUE);
    // The output max delay milliseconds must be set to at least 1
    // to guarantee the correctness of the CAS in last receive time
    outputMaxDelayMilliseconds =
        outputMaxDelaySeconds < 0 ? Long.MAX_VALUE : Math.max(outputMaxDelaySeconds * 1000, 1);
    outputMinReportIntervalMilliseconds =
        parameters.getLongOrDefault(
                PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY,
                PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE)
            * 1000;
    final String outputDatabase =
        parameters.getStringOrDefault(
            PROCESSOR_OUTPUT_DATABASE_KEY, PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE);
    outputDatabaseWithPathSeparator =
        outputDatabase.isEmpty() ? outputDatabase : outputDatabase + TsFileConstant.PATH_SEPARATOR;

    // Set output name
    final List<String> operatorNameList =
        Arrays.stream(
                parameters
                    .getStringOrDefault(PROCESSOR_OPERATORS_KEY, PROCESSOR_OPERATORS_DEFAULT_VALUE)
                    .replace(" ", "")
                    .split(","))
            .collect(Collectors.toList());

    final String outputMeasurementString =
        parameters.getStringOrDefault(
            PROCESSOR_OUTPUT_MEASUREMENTS_KEY, PROCESSOR_OUTPUT_MEASUREMENTS_DEFAULT_VALUE);
    final List<String> outputMeasurementNameList =
        outputMeasurementString.isEmpty()
            ? Collections.emptyList()
            : Arrays.stream(outputMeasurementString.replace(" ", "").split(","))
                .collect(Collectors.toList());

    final Map<String, String> aggregatorName2OutputNameMap = new HashMap<>();
    for (int i = 0; i < operatorNameList.size(); ++i) {
      if (i < outputMeasurementNameList.size()) {
        aggregatorName2OutputNameMap.put(
            operatorNameList.get(i).toLowerCase(), outputMeasurementNameList.get(i));
      } else {
        aggregatorName2OutputNameMap.put(
            operatorNameList.get(i).toLowerCase(), operatorNameList.get(i));
      }
    }

    // Load the useful aggregators' and their corresponding intermediate results' computational
    // logic.
    final Set<String> declaredIntermediateResultSet = new HashSet<>();
    final PipeDataRegionPluginAgent agent = PipeDataNodeAgent.plugin().dataRegion();
    for (final String pipePluginName :
        agent.getSubProcessorNamesWithSpecifiedParent(AbstractOperatorProcessor.class)) {
      // Children are allowed to validate and configure the computational logic
      // from the same parameters other than processor name
      final AbstractOperatorProcessor operatorProcessor =
          (AbstractOperatorProcessor)
              agent.getConfiguredProcessor(pipePluginName, parameters, configuration);
      operatorProcessor.getAggregatorOperatorSet().stream()
          .filter(
              operator ->
                  aggregatorName2OutputNameMap.containsKey(operator.getName().toLowerCase()))
          .forEach(
              operator -> {
                outputName2OperatorMap.put(
                    aggregatorName2OutputNameMap.get(operator.getName().toLowerCase()), operator);
                declaredIntermediateResultSet.addAll(operator.getDeclaredIntermediateValueNames());
              });

      operatorProcessor
          .getIntermediateResultOperatorSupplierSet()
          .forEach(
              supplier ->
                  intermediateResultName2OperatorSupplierMap.put(
                      supplier.get().getName(), supplier));
      operatorProcessors.add(operatorProcessor);
    }

    aggregatorName2OutputNameMap
        .entrySet()
        .removeIf(entry -> outputName2OperatorMap.containsKey(entry.getValue()));
    if (!aggregatorName2OutputNameMap.isEmpty()) {
      throw new PipeException(
          String.format(
              "The aggregator and output name %s is invalid.", aggregatorName2OutputNameMap));
    }

    intermediateResultName2OperatorSupplierMap.keySet().retainAll(declaredIntermediateResultSet);
    declaredIntermediateResultSet.removeAll(intermediateResultName2OperatorSupplierMap.keySet());
    if (!declaredIntermediateResultSet.isEmpty()) {
      throw new PipeException(
          String.format(
              "The needed intermediate values %s are not defined.", declaredIntermediateResultSet));
    }

    // Set up column name strings
    columnNameStringList = new String[outputName2OperatorMap.size()];
    final List<String> operatorNames = new ArrayList<>(outputName2OperatorMap.keySet());
    for (int i = 0; i < outputName2OperatorMap.size(); ++i) {
      columnNameStringList[i] = operatorNames.get(i);
    }

    // Get windowing processor
    final String processorName =
        parameters.getStringOrDefault(
                PROCESSOR_WINDOWING_STRATEGY_KEY, PROCESSOR_WINDOWING_STRATEGY_DEFAULT_VALUE)
            + WINDOWING_PROCESSOR_SUFFIX;
    final PipeProcessor windowProcessor =
        agent.getConfiguredProcessor(processorName, parameters, configuration);
    if (!(windowProcessor instanceof AbstractWindowingProcessor)) {
      throw new PipeException(
          String.format("The processor %s is not a windowing processor.", processorName));
    }
    windowingProcessor = (AbstractWindowingProcessor) windowProcessor;

    // Configure system parameters
    systemParameters.put(
        UDFParametersFactory.TIMESTAMP_PRECISION,
        CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

    // The aggregated result operators can be configured here because they are global
    // and stateless, needing only one configuration
    this.outputName2OperatorMap
        .values()
        .forEach(operator -> operator.configureSystemParameters(systemParameters));

    // Restore window state
    final ProgressIndex index = pipeTaskMeta.getProgressIndex();
    if (index == MinimumProgressIndex.INSTANCE) {
      return;
    }
    if (!(index instanceof TimeWindowStateProgressIndex)) {
      throw new PipeException(
          String.format(
              "The aggregate processor does not support progressIndexType %s", index.getType()));
    }

    final TimeWindowStateProgressIndex timeWindowStateProgressIndex =
        (TimeWindowStateProgressIndex) index;
    for (final Map.Entry<String, Pair<Long, ByteBuffer>> entry :
        timeWindowStateProgressIndex.getTimeSeries2TimestampWindowBufferPairMap().entrySet()) {
      final AtomicReference<TimeSeriesRuntimeState> stateReference =
          pipeName2timeSeries2TimeSeriesRuntimeStateMap
              .get(pipeName)
              .computeIfAbsent(
                  entry.getKey(),
                  key ->
                      new AtomicReference<>(
                          new TimeSeriesRuntimeState(
                              outputName2OperatorMap,
                              intermediateResultName2OperatorSupplierMap,
                              systemParameters,
                              windowingProcessor)));
      synchronized (stateReference) {
        try {
          stateReference.get().restoreTimestampAndWindows(entry.getValue());
        } catch (final IOException e) {
          throw new PipeException("Encountered exception when deserializing from PipeTaskMeta", e);
        }
      }
    }
  }