void processNormalTimeSeries()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java [196:392]


  void processNormalTimeSeries(
      List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList,
      MPPQueryContext context) {
    // [Step 0] Record the input value.
    List<Boolean> isAlignedPutInList = null;
    if (config.isAutoCreateSchemaEnabled()) {
      isAlignedPutInList =
          schemaComputationWithAutoCreationList.stream()
              .map(ISchemaComputationWithAutoCreation::isAligned)
              .collect(Collectors.toList());
    }

    // [Step 1] Cache 1. compute measurements and record logical views.
    List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
    List<List<Integer>> indexOfMissingMeasurementsList =
        new ArrayList<>(schemaComputationWithAutoCreationList.size());
    List<Integer> indexOfMissingMeasurements;
    for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
      indexOfMissingMeasurements =
          schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreationList.get(i));
      if (!indexOfMissingMeasurements.isEmpty()) {
        indexOfDevicesWithMissingMeasurements.add(i);
        indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
      }
    }
    // [Step 2] Cache 2. process recorded logical views.
    boolean hasUnFetchedLogicalView = false;
    List<Pair<List<Integer>, List<String>>> missedIndexAndPathStringOfViewList =
        new ArrayList<>(schemaComputationWithAutoCreationList.size());
    for (ISchemaComputationWithAutoCreation iSchemaComputationWithAutoCreation :
        schemaComputationWithAutoCreationList) {
      Pair<List<Integer>, List<String>> missedIndexAndPathString =
          schemaCache.computeSourceOfLogicalView(iSchemaComputationWithAutoCreation);
      if (!missedIndexAndPathString.left.isEmpty()) {
        hasUnFetchedLogicalView = true;
      }
      missedIndexAndPathStringOfViewList.add(missedIndexAndPathString);
    }
    // all schema can be taken from cache
    if (indexOfDevicesWithMissingMeasurements.isEmpty() && (!hasUnFetchedLogicalView)) {
      return;
    }
    // [Step 3] Fetch 1.fetch schema from remote. Process logical view first; then process
    // measurements.
    // try fetch the missing schema from remote
    ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
    ClusterSchemaTree remoteSchemaTree;
    if (!hasUnFetchedLogicalView) {
      remoteSchemaTree =
          clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
              schemaComputationWithAutoCreationList.stream()
                  .map(ISchemaComputationWithAutoCreation::getDevicePath)
                  .collect(Collectors.toList()),
              schemaComputationWithAutoCreationList.stream()
                  .map(ISchemaComputationWithAutoCreation::getMeasurements)
                  .collect(Collectors.toList()),
              indexOfDevicesWithMissingMeasurements,
              indexOfMissingMeasurementsList,
              context);
    } else {
      PathPatternTree patternTree =
          computePatternTreeNeededReFetch(
              schemaComputationWithAutoCreationList.stream()
                  .map(ISchemaComputationWithAutoCreation::getDevicePath)
                  .collect(Collectors.toList()),
              schemaComputationWithAutoCreationList.stream()
                  .map(ISchemaComputationWithAutoCreation::getMeasurements)
                  .collect(Collectors.toList()),
              indexOfDevicesWithMissingMeasurements,
              indexOfMissingMeasurementsList);
      List<String> fullPathsNeedReFetch = new ArrayList<>();
      for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
        fullPathsNeedReFetch.addAll(pair.right);
      }
      computePatternTreeNeededReFetch(patternTree, fullPathsNeedReFetch);
      remoteSchemaTree =
          clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree, context);
    }
    // make sure all missed views are computed.
    for (int i = 0; i < schemaComputationWithAutoCreationList.size(); i++) {
      schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
      remoteSchemaTree.computeSourceOfLogicalView(
          schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
    }
    // check and compute the fetched schema
    List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
    List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
    for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
      schemaComputationWithAutoCreation =
          schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
      indexOfMissingMeasurements =
          remoteSchemaTree.compute(
              schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
      schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
      if (!indexOfMissingMeasurements.isEmpty()) {
        indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
        indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
      }
    }

    // [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views. Process them.
    hasUnFetchedLogicalView = false;
    for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
      Pair<List<Integer>, List<String>> missedIndexAndPathString =
          schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreationList.get(i));
      if (!missedIndexAndPathString.left.isEmpty()) {
        hasUnFetchedLogicalView = true;
      }
      missedIndexAndPathStringOfViewList.get(i).left = missedIndexAndPathString.left;
      missedIndexAndPathStringOfViewList.get(i).right = missedIndexAndPathString.right;
    }
    if (hasUnFetchedLogicalView) {
      List<String> fullPathsNeedRefetch = new ArrayList<>();
      for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
        fullPathsNeedRefetch.addAll(pair.right);
      }
      ClusterSchemaTree viewSchemaTree =
          clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(fullPathsNeedRefetch, context);
      for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
        schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
        viewSchemaTree.computeSourceOfLogicalView(
            schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
      }
    }

    // all schema has been taken and processed
    if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
      return;
    }

    // [Step 5] Auto Create and process the missing schema
    if (config.isAutoCreateSchemaEnabled()) {
      List<PartialPath> devicePathList =
          schemaComputationWithAutoCreationList.stream()
              .map(ISchemaComputationWithAutoCreation::getDevicePath)
              .collect(Collectors.toList());
      List<Boolean> isAlignedRealList =
          schemaComputationWithAutoCreationList.stream()
              .map(ISchemaComputationWithAutoCreation::isAligned)
              .collect(Collectors.toList());
      // Check the isAligned value. If the input value is different from the actual value of the
      // existing device, throw exception.
      validateIsAlignedValueIfAutoCreate(isAlignedRealList, isAlignedPutInList, devicePathList);

      ClusterSchemaTree schemaTree = new ClusterSchemaTree();
      autoCreateSchemaExecutor.autoCreateTimeSeries(
          schemaTree,
          devicePathList,
          indexOfDevicesNeedAutoCreateSchema,
          indexOfMeasurementsNeedAutoCreate,
          schemaComputationWithAutoCreationList.stream()
              .map(ISchemaComputationWithAutoCreation::getMeasurements)
              .collect(Collectors.toList()),
          schemaComputationWithAutoCreationList.stream()
              .map(
                  o -> {
                    TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length];
                    for (int i = 0, length = dataTypes.length; i < length; i++) {
                      dataTypes[i] = o.getDataType(i);
                    }
                    return dataTypes;
                  })
              .collect(Collectors.toList()),
          isAlignedPutInList,
          context);
      indexOfDevicesWithMissingMeasurements = new ArrayList<>();
      indexOfMissingMeasurementsList = new ArrayList<>();
      for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
        schemaComputationWithAutoCreation =
            schemaComputationWithAutoCreationList.get(indexOfDevicesNeedAutoCreateSchema.get(i));
        indexOfMissingMeasurements =
            schemaTree.compute(
                schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
        if (!indexOfMissingMeasurements.isEmpty()) {
          indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
          indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
        }
      }

      // all schema has been taken and processed
      if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
        return;
      }
    } else {
      indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
      indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
    }

    // offer null for the rest missing schema processing
    for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
      schemaComputationWithAutoCreation =
          schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
      for (int index : indexOfMissingMeasurementsList.get(i)) {
        schemaComputationWithAutoCreation.computeMeasurement(index, null);
      }
    }
  }