in calcite-adapter/src/main/java/software/amazon/documentdb/jdbc/calcite/adapter/DocumentDbJoin.java [173:308]
private void joinSameCollection(
final Implementor implementor,
final Implementor rightImplementor,
final String collectionName,
final DocumentDbSchemaTable leftTable,
final DocumentDbSchemaTable rightTable) {
validateSameCollectionJoin(leftTable, rightTable);
final List<Pair<String, String>> leftList = implementor.getList();
implementor.setList(new ArrayList<>());
// Eliminate null (i.e. "unmatched") rows from any virtual tables based on join type.
// If an inner join, eliminate any null rows from either table.
// If a left outer join, eliminate the null rows of the left side.
// If a right outer join, eliminate the null rows of the right side.
final ImmutableCollection<DocumentDbSchemaColumn> leftFilterColumns = getFilterColumns(leftTable);
final ImmutableCollection<DocumentDbSchemaColumn> rightFilterColumns = getFilterColumns(rightTable);
final Supplier<String> leftFilter = () -> buildFieldsExistMatchFilter(leftFilterColumns);
final Supplier<String> rightFilter = () -> buildFieldsExistMatchFilter(rightFilterColumns);
final String filterLeft;
final String filterRight;
final boolean rightIsVirtual = isTableVirtual(rightTable);
final boolean leftIsVirtual = isTableVirtual(leftTable);
// Filter out unneeded columns from the left and right sides.
final Map<String, DocumentDbSchemaColumn> leftColumns = getRequiredColumns(leftTable, this::getLeft);
final Map<String, DocumentDbSchemaColumn> rightColumns = getRequiredColumns(rightTable, this::getRight);
// Create a new metadata table representing the denormalized form that will be used
// in later parts of the query. Resolve collisions from the right table.
final LinkedHashMap<String, DocumentDbSchemaColumn> columnMap = new LinkedHashMap<>(leftColumns);
final List<String> resolutions = new ArrayList<>();
boolean resolutionNeedsUnwind = implementor.isResolutionNeedsUnwind() || rightImplementor.isResolutionNeedsUnwind();
final Set<String> usedKeys = new LinkedHashSet<>(columnMap.keySet());
for (Entry<String, DocumentDbSchemaColumn> entry : rightColumns.entrySet()) {
final String key = entry.getKey();
if (columnMap.containsKey(key)) {
final String newKey =
SqlValidatorUtil.uniquify(key, usedKeys, SqlValidatorUtil.EXPR_SUGGESTER);
final DocumentDbSchemaColumn leftColumn = columnMap.get(key);
// If the columns correspond to the same field, they may have different values depending on
// join type. Create a new column and add a new field.
if (entry.getValue().getFieldPath().equals(leftColumn.getFieldPath())) {
columnMap.put(newKey, entry.getValue());
final DocumentDbSchemaColumn column = entry.getValue();
final DocumentDbSchemaColumn newRightColumn =
DocumentDbMetadataColumn.builder()
.fieldPath(column.getFieldPath())
.sqlName(newKey)
.sqlType(column.getSqlType())
.dbType(column.getDbType())
.isIndex(column.isIndex())
.isPrimaryKey(column.isPrimaryKey())
.foreignKeyTableName(column.getForeignKeyTableName())
.foreignKeyColumnName(column.getForeignKeyColumnName())
.resolvedPath(newKey)
.build();
columnMap.put(newKey, newRightColumn);
resolutionNeedsUnwind = column.isIndex() || resolutionNeedsUnwind;
// Handle any column renames.
final String leftPath = DocumentDbRules.getPath(leftColumn, true);
final String rightPath = DocumentDbRules.getPath(entry.getValue(), true);
handleColumnRename(
resolutions, newKey, rightPath, rightIsVirtual, rightFilterColumns);
handleColumnRename(
resolutions,
leftPath,
leftPath,
leftIsVirtual,
leftFilterColumns);
} else {
columnMap.put(newKey, entry.getValue());
}
} else {
columnMap.put(key, entry.getValue());
}
}
implementor.setResolutionNeedsUnwind(resolutionNeedsUnwind);
// Add any unwinds from the right.
rightImplementor.getUnwinds().forEach(op -> {
if (!implementor.getUnwinds().contains(op)) {
implementor.addUnwind(op);
}
});
// Add the renames.
if (!resolutions.isEmpty()) {
final String newFields = Util.toString(resolutions, "{", ", ", "}");
final String aggregateString = "{ $addFields : " + newFields + "}";
implementor.addCollisionResolution(aggregateString);
}
switch (getJoinType()) {
case INNER:
filterLeft = leftFilter.get();
filterRight = rightFilter.get();
if (filterLeft != null) {
implementor.add(null, filterLeft);
}
if (filterRight != null) {
implementor.add(null, filterRight);
}
implementor.setNullFiltered(true);
break;
case LEFT:
filterLeft = leftFilter.get();
if (filterLeft != null) {
implementor.add(null, filterLeft);
}
implementor.setNullFiltered(true);
break;
default:
throw new IllegalArgumentException(
SqlError.lookup(SqlError.UNSUPPORTED_JOIN_TYPE, getJoinType().name()));
}
// Add any remaining operations from the left.
leftList.forEach(pair -> implementor.add(pair.left, pair.right));
// Add remaining operations from the right.
rightImplementor.getList().forEach(pair -> implementor.add(pair.left, pair.right));
final DocumentDbMetadataTable metadata = DocumentDbMetadataTable
.builder()
.sqlName(leftTable.getSqlName())
.collectionName(collectionName)
.columns(columnMap)
.build();
final DocumentDbTable joinedTable = new DocumentDbTable(collectionName, metadata);
implementor.setDocumentDbTable(joinedTable);
implementor.setMetadataTable(metadata);
}