in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/NormalSchemaFetcher.java [196:392]
void processNormalTimeSeries(
List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList,
MPPQueryContext context) {
// [Step 0] Record the input value.
List<Boolean> isAlignedPutInList = null;
if (config.isAutoCreateSchemaEnabled()) {
isAlignedPutInList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::isAligned)
.collect(Collectors.toList());
}
// [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
List<List<Integer>> indexOfMissingMeasurementsList =
new ArrayList<>(schemaComputationWithAutoCreationList.size());
List<Integer> indexOfMissingMeasurements;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
indexOfMissingMeasurements =
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreationList.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(i);
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
}
}
// [Step 2] Cache 2. process recorded logical views.
boolean hasUnFetchedLogicalView = false;
List<Pair<List<Integer>, List<String>>> missedIndexAndPathStringOfViewList =
new ArrayList<>(schemaComputationWithAutoCreationList.size());
for (ISchemaComputationWithAutoCreation iSchemaComputationWithAutoCreation :
schemaComputationWithAutoCreationList) {
Pair<List<Integer>, List<String>> missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(iSchemaComputationWithAutoCreation);
if (!missedIndexAndPathString.left.isEmpty()) {
hasUnFetchedLogicalView = true;
}
missedIndexAndPathStringOfViewList.add(missedIndexAndPathString);
}
// all schema can be taken from cache
if (indexOfDevicesWithMissingMeasurements.isEmpty() && (!hasUnFetchedLogicalView)) {
return;
}
// [Step 3] Fetch 1.fetch schema from remote. Process logical view first; then process
// measurements.
// try fetch the missing schema from remote
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation;
ClusterSchemaTree remoteSchemaTree;
if (!hasUnFetchedLogicalView) {
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaOfMultiDevices(
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
indexOfDevicesWithMissingMeasurements,
indexOfMissingMeasurementsList,
context);
} else {
PathPatternTree patternTree =
computePatternTreeNeededReFetch(
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
indexOfDevicesWithMissingMeasurements,
indexOfMissingMeasurementsList);
List<String> fullPathsNeedReFetch = new ArrayList<>();
for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
fullPathsNeedReFetch.addAll(pair.right);
}
computePatternTreeNeededReFetch(patternTree, fullPathsNeedReFetch);
remoteSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithPatternTreeAndCache(patternTree, context);
}
// make sure all missed views are computed.
for (int i = 0; i < schemaComputationWithAutoCreationList.size(); i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
remoteSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
}
// check and compute the fetched schema
List<Integer> indexOfDevicesNeedAutoCreateSchema = new ArrayList<>();
List<List<Integer>> indexOfMeasurementsNeedAutoCreate = new ArrayList<>();
for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
indexOfMissingMeasurements =
remoteSchemaTree.compute(
schemaComputationWithAutoCreation, indexOfMissingMeasurementsList.get(i));
schemaComputationWithAutoCreation.recordRangeOfLogicalViewSchemaListNow();
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesNeedAutoCreateSchema.add(indexOfDevicesWithMissingMeasurements.get(i));
indexOfMeasurementsNeedAutoCreate.add(indexOfMissingMeasurements);
}
}
// [Step 4] Fetch 2. Some fetched measurements in [Step 3] are views. Process them.
hasUnFetchedLogicalView = false;
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
Pair<List<Integer>, List<String>> missedIndexAndPathString =
schemaCache.computeSourceOfLogicalView(schemaComputationWithAutoCreationList.get(i));
if (!missedIndexAndPathString.left.isEmpty()) {
hasUnFetchedLogicalView = true;
}
missedIndexAndPathStringOfViewList.get(i).left = missedIndexAndPathString.left;
missedIndexAndPathStringOfViewList.get(i).right = missedIndexAndPathString.right;
}
if (hasUnFetchedLogicalView) {
List<String> fullPathsNeedRefetch = new ArrayList<>();
for (Pair<List<Integer>, List<String>> pair : missedIndexAndPathStringOfViewList) {
fullPathsNeedRefetch.addAll(pair.right);
}
ClusterSchemaTree viewSchemaTree =
clusterSchemaFetchExecutor.fetchSchemaWithFullPaths(fullPathsNeedRefetch, context);
for (int i = 0, size = schemaComputationWithAutoCreationList.size(); i < size; i++) {
schemaComputationWithAutoCreation = schemaComputationWithAutoCreationList.get(i);
viewSchemaTree.computeSourceOfLogicalView(
schemaComputationWithAutoCreation, missedIndexAndPathStringOfViewList.get(i).left);
}
}
// all schema has been taken and processed
if (indexOfDevicesNeedAutoCreateSchema.isEmpty()) {
return;
}
// [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
List<PartialPath> devicePathList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getDevicePath)
.collect(Collectors.toList());
List<Boolean> isAlignedRealList =
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::isAligned)
.collect(Collectors.toList());
// Check the isAligned value. If the input value is different from the actual value of the
// existing device, throw exception.
validateIsAlignedValueIfAutoCreate(isAlignedRealList, isAlignedPutInList, devicePathList);
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
schemaTree,
devicePathList,
indexOfDevicesNeedAutoCreateSchema,
indexOfMeasurementsNeedAutoCreate,
schemaComputationWithAutoCreationList.stream()
.map(ISchemaComputationWithAutoCreation::getMeasurements)
.collect(Collectors.toList()),
schemaComputationWithAutoCreationList.stream()
.map(
o -> {
TSDataType[] dataTypes = new TSDataType[o.getMeasurements().length];
for (int i = 0, length = dataTypes.length; i < length; i++) {
dataTypes[i] = o.getDataType(i);
}
return dataTypes;
})
.collect(Collectors.toList()),
isAlignedPutInList,
context);
indexOfDevicesWithMissingMeasurements = new ArrayList<>();
indexOfMissingMeasurementsList = new ArrayList<>();
for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesNeedAutoCreateSchema.get(i));
indexOfMissingMeasurements =
schemaTree.compute(
schemaComputationWithAutoCreation, indexOfMeasurementsNeedAutoCreate.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
indexOfDevicesWithMissingMeasurements.add(indexOfDevicesNeedAutoCreateSchema.get(i));
indexOfMissingMeasurementsList.add(indexOfMissingMeasurements);
}
}
// all schema has been taken and processed
if (indexOfDevicesWithMissingMeasurements.isEmpty()) {
return;
}
} else {
indexOfDevicesWithMissingMeasurements = indexOfDevicesNeedAutoCreateSchema;
indexOfMissingMeasurementsList = indexOfMeasurementsNeedAutoCreate;
}
// offer null for the rest missing schema processing
for (int i = 0; i < indexOfDevicesWithMissingMeasurements.size(); i++) {
schemaComputationWithAutoCreation =
schemaComputationWithAutoCreationList.get(indexOfDevicesWithMissingMeasurements.get(i));
for (int index : indexOfMissingMeasurementsList.get(i)) {
schemaComputationWithAutoCreation.computeMeasurement(index, null);
}
}
}