in calcite-adapter/src/main/java/software/amazon/documentdb/jdbc/calcite/adapter/DocumentDbJoin.java [532:643]
private void joinDifferentCollections(
final Implementor implementor,
final Implementor rightImplementor,
final String leftCollectionName,
final String rightCollectionName,
final DocumentDbSchemaTable leftTable,
final DocumentDbSchemaTable rightTable) {
// Remove null rows from right, if any.
DocumentDbToEnumerableConverter.handleVirtualTable(rightImplementor);
// Validate that this is a simple equality join.
validateDifferentCollectionJoin();
// Determine the new field in the joined documents that will hold the matched rows from the right.
final String rightMatches = rightTable.getSqlName();
// Filter out unneeded columns from the left and right sides.
final Map<String, DocumentDbSchemaColumn> leftColumns = getRequiredColumns(leftTable, this::getLeft);
final Map<String, DocumentDbSchemaColumn> rightColumns = getRequiredColumns(rightTable, this::getRight);
// Determine the new metadata. Handle any naming collisions from the right side. Columns
// from the right will now be nested under field specified by rightMatches.
final LinkedHashMap<String, DocumentDbSchemaColumn> columnMap = new LinkedHashMap<>(leftColumns);
final Set<String> usedKeys = new LinkedHashSet<>(columnMap.keySet());
for (Entry<String, DocumentDbSchemaColumn> entry : rightColumns.entrySet()) {
final String key = SqlValidatorUtil.uniquify(entry.getKey(), usedKeys, SqlValidatorUtil.EXPR_SUGGESTER);
final DocumentDbSchemaColumn oldColumn = entry.getValue();
final DocumentDbMetadataColumn newColumn = DocumentDbMetadataColumn.builder()
.sqlName(oldColumn.getSqlName())
.fieldPath(oldColumn.getFieldPath())
.dbType(oldColumn.getDbType())
.isPrimaryKey(oldColumn.isPrimaryKey())
.isIndex(oldColumn.isIndex())
.foreignKeyColumnName(oldColumn.getForeignKeyColumnName())
.foreignKeyTableName(oldColumn.getForeignKeyTableName())
.resolvedPath(combinePath(rightMatches, DocumentDbRules.getPath(oldColumn, false)))
.build();
columnMap.put(key, newColumn);
}
final DocumentDbMetadataTable metadata = DocumentDbMetadataTable
.builder()
.sqlName(leftCollectionName)
.columns(columnMap)
.build();
final DocumentDbTable joinedTable = new DocumentDbTable(leftCollectionName, metadata);
implementor.setDocumentDbTable(joinedTable);
implementor.setMetadataTable(metadata);
// Add the lookup stage. This is the stage that "joins" the 2 collections.
final JsonBuilder jsonBuilder = new JsonBuilder();
final Map<String, Object> lookupMap = new LinkedHashMap<>();
final Map<String, Object> lookupFields = new LinkedHashMap<>();
// 1. Add collection to join.
lookupFields.put("from", rightCollectionName);
// 2. Fields from the left need to be in let so they can be used in $match.
final Map<String, String> letExpressions =
leftColumns.values().stream()
.collect(
Collectors.toMap(
DocumentDbSchemaColumn::getSqlName,
column -> "$" + DocumentDbRules.getPath(column, false)));
lookupFields.put("let", letExpressions);
// 3. Add any stages from the right implementor. Convert the json strings
// into objects so they can be added as a list to the lookup pipeline.
final List<Map<String, Object>> stages = new ArrayList<>();
final ObjectMapper mapper = JsonMapper.builder()
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
.build();
for (Pair<String, String> operations : rightImplementor.getList()) {
final String stage = operations.right;
final Map<String, Object> map = mapper.readValue(stage, new TypeReference<LinkedHashMap<String, Object>>() {
});
stages.add(map);
}
// 4. Determine the $match stage for the pipeline. This is the join condition.
final JoinTranslator translator = new JoinTranslator(implementor.getRexBuilder(), leftColumns, rightColumns);
stages.add(translator.translateMatch(getCondition()));
// 5. Add all stages in order to the pipeline.
lookupFields.put("pipeline", stages);
// 6. Add the new field where the matches will be placed.
lookupFields.put("as", rightMatches);
lookupMap.put("$lookup", lookupFields);
implementor.add(null, jsonBuilder.toJsonString(lookupMap));
// Unwind the matched rows. Preserve null/empty arrays (unmatched rows) depending on join type.
final UnwindOptions opts = new UnwindOptions();
switch (getJoinType()) {
case INNER:
// Remove rows for which there were no matches.
opts.preserveNullAndEmptyArrays(false);
break;
case LEFT:
// Keep rows for which there were no matches.
opts.preserveNullAndEmptyArrays(true);
break;
default:
throw new IllegalArgumentException(SqlError.lookup(SqlError.UNSUPPORTED_JOIN_TYPE, getJoinType().name()));
}
implementor.add(null, String.valueOf(Aggregates.unwind("$" + rightMatches, opts)));
LOGGER.debug("Created join stages of pipeline.");
LOGGER.debug("Pipeline stages added: {}",
implementor.getList().stream()
.map(c -> c.right)
.toArray());
}