in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java [749:868]
public void createAlignedTimeSeries(final ICreateAlignedTimeSeriesPlan plan)
throws MetadataException {
if (!regionStatistics.isAllowToCreateNewSeries()) {
throw new SeriesOverflowException(
regionStatistics.getGlobalMemoryUsage(), regionStatistics.getGlobalSeriesNumber());
}
try {
final PartialPath prefixPath = plan.getDevicePath();
final List<String> measurements = plan.getMeasurements();
final List<TSDataType> dataTypes = plan.getDataTypes();
final List<TSEncoding> encodings = plan.getEncodings();
final List<CompressionType> compressors = plan.getCompressors();
final List<String> aliasList = plan.getAliasList();
final List<Map<String, String>> tagsList = plan.getTagsList();
final List<Map<String, String>> attributesList = plan.getAttributesList();
final List<IMeasurementMNode<IMemMNode>> measurementMNodeList;
for (int i = 0; i < measurements.size(); i++) {
SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), encodings.get(i));
}
// Used iff with merge
final Set<Integer> existingMeasurementIndexes = new HashSet<>();
// create time series in MTree
measurementMNodeList =
mTree.createAlignedTimeSeries(
prefixPath,
measurements,
dataTypes,
encodings,
compressors,
aliasList,
(plan instanceof CreateAlignedTimeSeriesPlanImpl
&& ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
existingMeasurementIndexes);
// update statistics and schemaDataTypeNumMap
regionStatistics.addMeasurement(measurementMNodeList.size());
List<Long> tagOffsets = plan.getTagOffsets();
// Merge the existing ones
// The existing measurements are written into the "upsert" but not written
// to the "createSeries" in mLog
for (int i = measurements.size() - 1; i >= 0; --i) {
if (existingMeasurementIndexes.isEmpty()) {
break;
}
if (!existingMeasurementIndexes.remove(i)) {
continue;
}
// WARNING: The input lists can not be immutable when the "withMerge" is set.
upsertAliasAndTagsAndAttributes(
Objects.nonNull(aliasList) ? aliasList.remove(i) : null,
Objects.nonNull(tagsList) ? tagsList.remove(i) : null,
Objects.nonNull(attributesList) ? attributesList.remove(i) : null,
prefixPath.concatAsMeasurementPath(measurements.get(i)));
if (Objects.nonNull(tagOffsets) && !tagOffsets.isEmpty()) {
tagOffsets.remove(i);
}
// Nonnull
measurements.remove(i);
dataTypes.remove(i);
encodings.remove(i);
compressors.remove(i);
}
if (measurementMNodeList.isEmpty()) {
return;
}
for (int i = 0; i < measurements.size(); i++) {
if (tagOffsets != null && !tagOffsets.isEmpty() && isRecovering) {
if (tagOffsets.get(i) != -1) {
tagManager.recoverIndex(plan.getTagOffsets().get(i), measurementMNodeList.get(i));
}
} else if (tagsList != null && !tagsList.isEmpty()) {
if (tagsList.get(i) != null) {
// tag key, tag value
tagManager.addIndex(tagsList.get(i), measurementMNodeList.get(i));
}
}
}
// Write log
tagOffsets = new ArrayList<>();
if (!isRecovering) {
if ((tagsList != null && !tagsList.isEmpty())
|| (attributesList != null && !attributesList.isEmpty())) {
Map<String, String> tags;
Map<String, String> attributes;
for (int i = 0; i < measurements.size(); i++) {
tags = tagsList == null ? null : tagsList.get(i);
attributes = attributesList == null ? null : attributesList.get(i);
if (tags == null && attributes == null) {
tagOffsets.add(-1L);
} else {
tagOffsets.add(tagManager.writeTagFile(tags, attributes));
}
}
} else {
for (int i = 0; i < measurements.size(); i++) {
tagOffsets.add(-1L);
}
}
plan.setTagOffsets(tagOffsets);
writeToMLog(plan);
}
tagOffsets = plan.getTagOffsets();
for (int i = 0; i < measurements.size(); i++) {
if (tagOffsets.get(i) != -1) {
measurementMNodeList.get(i).setOffset(tagOffsets.get(i));
}
}
} catch (final IOException e) {
throw new MetadataException(e);
}
}