in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java [1818:2007]
public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
Operator leftChild = node.getLeftChild().accept(this, context);
Operator rightChild = node.getRightChild().accept(this, context);
ImmutableMap<Symbol, Integer> leftColumnNamesMap =
makeLayoutFromOutputSymbols(node.getLeftChild().getOutputSymbols());
int[] leftOutputSymbolIdx = new int[node.getLeftOutputSymbols().size()];
for (int i = 0; i < leftOutputSymbolIdx.length; i++) {
Integer index = leftColumnNamesMap.get(node.getLeftOutputSymbols().get(i));
if (index == null) {
throw new IllegalStateException(
"Left child of JoinNode doesn't contain LeftOutputSymbol "
+ node.getLeftOutputSymbols().get(i));
}
leftOutputSymbolIdx[i] = index;
}
ImmutableMap<Symbol, Integer> rightColumnNamesMap =
makeLayoutFromOutputSymbols(node.getRightChild().getOutputSymbols());
int[] rightOutputSymbolIdx = new int[node.getRightOutputSymbols().size()];
for (int i = 0; i < rightOutputSymbolIdx.length; i++) {
Integer index = rightColumnNamesMap.get(node.getRightOutputSymbols().get(i));
if (index == null) {
throw new IllegalStateException(
"Right child of JoinNode doesn't contain RightOutputSymbol "
+ node.getLeftOutputSymbols().get(i));
}
rightOutputSymbolIdx[i] = index;
}
// cross join does not need time column
if (node.isCrossJoin()) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
SimpleNestedLoopCrossJoinOperator.class.getSimpleName());
return new SimpleNestedLoopCrossJoinOperator(
operatorContext,
leftChild,
rightChild,
leftOutputSymbolIdx,
rightOutputSymbolIdx,
dataTypes);
}
semanticCheckForJoin(node);
JoinNode.AsofJoinClause asofJoinClause = node.getAsofCriteria().orElse(null);
int equiSize = node.getCriteria().size();
int size = equiSize + (asofJoinClause == null ? 0 : 1);
int[] leftJoinKeyPositions = new int[size];
for (int i = 0; i < equiSize; i++) {
Integer leftJoinKeyPosition = leftColumnNamesMap.get(node.getCriteria().get(i).getLeft());
if (leftJoinKeyPosition == null) {
throw new IllegalStateException("Left child of JoinNode doesn't contain left join key.");
}
leftJoinKeyPositions[i] = leftJoinKeyPosition;
}
List<Type> joinKeyTypes = new ArrayList<>(size);
int[] rightJoinKeyPositions = new int[size];
for (int i = 0; i < equiSize; i++) {
Integer rightJoinKeyPosition = rightColumnNamesMap.get(node.getCriteria().get(i).getRight());
if (rightJoinKeyPosition == null) {
throw new IllegalStateException("Right child of JoinNode doesn't contain right join key.");
}
rightJoinKeyPositions[i] = rightJoinKeyPosition;
Type leftJoinKeyType =
context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getLeft());
checkIfJoinKeyTypeMatches(
leftJoinKeyType,
context.getTypeProvider().getTableModelType(node.getCriteria().get(i).getRight()));
joinKeyTypes.add(leftJoinKeyType);
}
if (asofJoinClause != null) {
Integer leftAsofJoinKeyPosition = leftColumnNamesMap.get(asofJoinClause.getLeft());
if (leftAsofJoinKeyPosition == null) {
throw new IllegalStateException(
"Left child of JoinNode doesn't contain left ASOF main join key.");
}
leftJoinKeyPositions[equiSize] = leftAsofJoinKeyPosition;
Integer rightAsofJoinKeyPosition = rightColumnNamesMap.get(asofJoinClause.getRight());
if (rightAsofJoinKeyPosition == null) {
throw new IllegalStateException(
"Right child of JoinNode doesn't contain right ASOF main join key.");
}
rightJoinKeyPositions[equiSize] = rightAsofJoinKeyPosition;
if (context.getTypeProvider().getTableModelType(asofJoinClause.getLeft()) != TIMESTAMP) {
throw new IllegalStateException("Type of left ASOF Join key is not TIMESTAMP");
}
if (context.getTypeProvider().getTableModelType(asofJoinClause.getRight()) != TIMESTAMP) {
throw new IllegalStateException("Type of right ASOF Join key is not TIMESTAMP");
}
ComparisonExpression.Operator asofOperator = asofJoinClause.getOperator();
if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AsofMergeSortInnerJoinOperator.class.getSimpleName());
return new AsofMergeSortInnerJoinOperator(
operatorContext,
leftChild,
leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
rightJoinKeyPositions,
rightOutputSymbolIdx,
JoinKeyComparatorFactory.getAsofComparators(
joinKeyTypes,
asofOperator == ComparisonExpression.Operator.LESS_THAN_OR_EQUAL
|| asofOperator == ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
!asofJoinClause.isOperatorContainsGreater()),
dataTypes);
} else {
throw new IllegalStateException("Unsupported ASOF join type: " + node.getJoinType());
}
}
if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
MergeSortInnerJoinOperator.class.getSimpleName());
return new MergeSortInnerJoinOperator(
operatorContext,
leftChild,
leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
rightJoinKeyPositions,
rightOutputSymbolIdx,
JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
dataTypes);
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
MergeSortFullOuterJoinOperator.class.getSimpleName());
return new MergeSortFullOuterJoinOperator(
operatorContext,
leftChild,
leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
rightJoinKeyPositions,
rightOutputSymbolIdx,
JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
dataTypes,
joinKeyTypes.stream().map(this::buildUpdateLastRowFunction).collect(Collectors.toList()));
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.LEFT) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
MergeSortLeftJoinOperator.class.getSimpleName());
return new MergeSortLeftJoinOperator(
operatorContext,
leftChild,
leftJoinKeyPositions,
leftOutputSymbolIdx,
rightChild,
rightJoinKeyPositions,
rightOutputSymbolIdx,
JoinKeyComparatorFactory.getComparators(joinKeyTypes, true),
dataTypes);
}
throw new IllegalStateException("Unsupported join type: " + node.getJoinType());
}