in connectors/spark-tsfile/src/main/java/org/apache/iotdb/spark/tsfile/qp/optimizer/PhysicalOptimizer.java [53:134]
public List<TSQueryPlan> optimize(
SingleQuery singleQuery, List<String> paths, TsFileSequenceReader in, Long start, Long end)
throws IOException {
Map<String, TSDataType> allMeasurementsInFile = in.getAllMeasurements();
List<String> selectedSeries = new ArrayList<>();
for (String path : paths) {
if (!columnNames.contains(path) && !path.equals(SQLConstant.RESERVED_TIME)) {
selectedSeries.add(path);
}
}
FilterOperator timeFilter = null;
FilterOperator valueFilter = null;
if (singleQuery != null) {
timeFilter = singleQuery.getTimeFilterOperator();
valueFilter = singleQuery.getValueFilterOperator();
if (valueFilter != null) {
List<String> filterPaths = valueFilter.getAllPaths();
// if filter paths doesn't in tsfile, don't read
for (String filterPath : filterPaths) {
if (!allMeasurementsInFile.containsKey(filterPath)) {
return new ArrayList<>();
}
}
}
flag = true;
Map<String, Set<String>> selectColumns = mergeColumns(singleQuery.getColumnFilterOperator());
if (!flag) {
// e.g. where column1 = 'd1' and column2 = 'd2', should not read
return new ArrayList<>();
}
// if select deltaObject, then match with measurement
List<String> actualDeltaObjects =
in.getDeviceNameInRange(start, end).stream()
.map(deviceID -> ((PlainDeviceID) deviceID).toStringID())
.collect(Collectors.toList());
if (!selectColumns.isEmpty()) {
combination(
actualDeltaObjects,
selectColumns,
selectColumns.keySet().toArray(),
0,
new String[selectColumns.size()]);
} else {
validDeltaObjects.addAll(actualDeltaObjects);
}
} else {
validDeltaObjects.addAll(
in.getDeviceNameInRange(start, end).stream()
.map(deviceID -> ((PlainDeviceID) deviceID).toStringID())
.collect(Collectors.toList()));
}
// query all measurements from TSFile
if (selectedSeries.isEmpty()) {
selectedSeries.addAll(allMeasurementsInFile.keySet());
} else {
// remove paths that doesn't exist in file
selectedSeries.removeIf(path -> !allMeasurementsInFile.containsKey(path));
}
List<TSQueryPlan> tsFileQueries = new ArrayList<>();
for (String deltaObject : validDeltaObjects) {
List<String> newPaths = new ArrayList<>();
for (String path : selectedSeries) {
String newPath = deltaObject + SQLConstant.PATH_SEPARATOR + path;
newPaths.add(newPath);
}
if (valueFilter == null) {
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, null));
} else {
FilterOperator newValueFilter = valueFilter.clone();
newValueFilter.addHeadDeltaObjectPath(deltaObject);
tsFileQueries.add(new TSQueryPlan(newPaths, timeFilter, newValueFilter));
}
}
return tsFileQueries;
}