public void createAlignedTimeSeries()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java [721:850]


  public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan)
      throws MetadataException {
    while (!regionStatistics.isAllowToCreateNewSeries()) {
      ReleaseFlushMonitor.getInstance().waitIfReleasing();
    }

    try {
      final PartialPath prefixPath = plan.getDevicePath();
      final List<String> measurements = plan.getMeasurements();
      final List<TSDataType> dataTypes = plan.getDataTypes();
      final List<TSEncoding> encodings = plan.getEncodings();
      final List<CompressionType> compressors = plan.getCompressors();
      final List<String> aliasList = plan.getAliasList();
      final List<Map<String, String>> tagsList = plan.getTagsList();
      final List<Map<String, String>> attributesList = plan.getAttributesList();
      final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList;

      for (int i = 0; i < measurements.size(); i++) {
        SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
      }

      // Used iff with merge
      final Set<Integer> existingMeasurementIndexes = new HashSet<>();

      // Create time series in MTree
      measurementMNodeList =
          mtree.createAlignedTimeSeries(
              prefixPath,
              measurements,
              dataTypes,
              encodings,
              compressors,
              aliasList,
              (plan instanceof CreateAlignedTimeSeriesPlanImpl
                  && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
              existingMeasurementIndexes);

      try {
        // Update statistics and schemaDataTypeNumMap
        regionStatistics.addMeasurement(measurementMNodeList.size());

        List<Long> tagOffsets = plan.getTagOffsets();

        // Merge the existing ones
        // The existing measurements are written into the "upsert" but not written
        // to the "createSeries" in mLog
        for (int i = measurements.size() - 1; i >= 0; --i) {
          if (existingMeasurementIndexes.isEmpty()) {
            break;
          }
          if (!existingMeasurementIndexes.remove(i)) {
            continue;
          }
          // WARNING: The input lists can not be immutable when the "withMerge" is set.
          upsertAliasAndTagsAndAttributes(
              Objects.nonNull(aliasList) ? aliasList.remove(i) : null,
              Objects.nonNull(tagsList) ? tagsList.remove(i) : null,
              Objects.nonNull(attributesList) ? attributesList.remove(i) : null,
              prefixPath.concatAsMeasurementPath(measurements.get(i)));
          if (Objects.nonNull(tagOffsets) && !tagOffsets.isEmpty()) {
            tagOffsets.remove(i);
          }
          // Nonnull
          measurements.remove(i);
          dataTypes.remove(i);
          encodings.remove(i);
          compressors.remove(i);
        }

        if (measurementMNodeList.isEmpty()) {
          return;
        }

        for (int i = 0; i < measurements.size(); i++) {
          if (tagOffsets != null && !plan.getTagOffsets().isEmpty() && isRecovering) {
            if (tagOffsets.get(i) != -1) {
              tagManager.recoverIndex(plan.getTagOffsets().get(i), measurementMNodeList.get(i));
              mtree.pinMNode(measurementMNodeList.get(i).getAsMNode());
            }
          } else if (tagsList != null && !tagsList.isEmpty()) {
            if (tagsList.get(i) != null) {
              // Tag key, tag value
              tagManager.addIndex(tagsList.get(i), measurementMNodeList.get(i));
              mtree.pinMNode(measurementMNodeList.get(i).getAsMNode());
            }
          }
        }

        // Write log
        tagOffsets = new ArrayList<>();
        if (!isRecovering) {
          if ((tagsList != null && !tagsList.isEmpty())
              || (attributesList != null && !attributesList.isEmpty())) {
            Map<String, String> tags;
            Map<String, String> attributes;
            for (int i = 0; i < measurements.size(); i++) {
              tags = tagsList == null ? null : tagsList.get(i);
              attributes = attributesList == null ? null : attributesList.get(i);
              if (tags == null && attributes == null) {
                tagOffsets.add(-1L);
              } else {
                tagOffsets.add(tagManager.writeTagFile(tags, attributes));
              }
            }
          } else {
            for (int i = 0; i < measurements.size(); i++) {
              tagOffsets.add(-1L);
            }
          }
          plan.setTagOffsets(tagOffsets);
          writeToMLog(plan);
        }
        tagOffsets = plan.getTagOffsets();
        for (int i = 0; i < measurements.size(); i++) {
          if (tagOffsets.get(i) != -1) {
            final long offset = tagOffsets.get(i);
            mtree.updateMNode(
                measurementMNodeList.get(i).getAsMNode(),
                o -> o.getAsMeasurementMNode().setOffset(offset));
          }
        }
      } finally {
        for (final IMeasurementMNode<ICachedMNode> measurementMNode : measurementMNodeList) {
          mtree.unPinMNode(measurementMNode.getAsMNode());
        }
      }
    } catch (final IOException e) {
      throw new MetadataException(e);
    }
  }