private void joinDifferentCollections()

in calcite-adapter/src/main/java/software/amazon/documentdb/jdbc/calcite/adapter/DocumentDbJoin.java [532:643]


    private void joinDifferentCollections(
            final Implementor implementor,
            final Implementor rightImplementor,
            final String leftCollectionName,
            final String rightCollectionName,
            final DocumentDbSchemaTable leftTable,
            final DocumentDbSchemaTable rightTable) {
        // Remove null rows from right, if any.
        DocumentDbToEnumerableConverter.handleVirtualTable(rightImplementor);

        // Validate that this is a simple equality join.
        validateDifferentCollectionJoin();

        // Determine the new field in the joined documents that will hold the matched rows from the right.
        final String rightMatches = rightTable.getSqlName();

        // Filter out unneeded columns from the left and right sides.
        final Map<String, DocumentDbSchemaColumn> leftColumns = getRequiredColumns(leftTable, this::getLeft);
        final Map<String, DocumentDbSchemaColumn> rightColumns = getRequiredColumns(rightTable, this::getRight);

        // Determine the new metadata. Handle any naming collisions from the right side. Columns
        // from the right will now be nested under field specified by rightMatches.
        final LinkedHashMap<String, DocumentDbSchemaColumn> columnMap = new LinkedHashMap<>(leftColumns);
        final Set<String> usedKeys = new LinkedHashSet<>(columnMap.keySet());
        for (Entry<String, DocumentDbSchemaColumn> entry : rightColumns.entrySet()) {
            final String key = SqlValidatorUtil.uniquify(entry.getKey(), usedKeys, SqlValidatorUtil.EXPR_SUGGESTER);
            final DocumentDbSchemaColumn oldColumn = entry.getValue();
            final DocumentDbMetadataColumn newColumn = DocumentDbMetadataColumn.builder()
                    .sqlName(oldColumn.getSqlName())
                    .fieldPath(oldColumn.getFieldPath())
                    .dbType(oldColumn.getDbType())
                    .isPrimaryKey(oldColumn.isPrimaryKey())
                    .isIndex(oldColumn.isIndex())
                    .foreignKeyColumnName(oldColumn.getForeignKeyColumnName())
                    .foreignKeyTableName(oldColumn.getForeignKeyTableName())
                    .resolvedPath(combinePath(rightMatches, DocumentDbRules.getPath(oldColumn, false)))
                    .build();
            columnMap.put(key, newColumn);
        }
        final DocumentDbMetadataTable metadata = DocumentDbMetadataTable
                .builder()
                .sqlName(leftCollectionName)
                .columns(columnMap)
                .build();
        final DocumentDbTable joinedTable = new DocumentDbTable(leftCollectionName, metadata);
        implementor.setDocumentDbTable(joinedTable);
        implementor.setMetadataTable(metadata);

        // Add the lookup stage. This is the stage that "joins" the 2 collections.
        final JsonBuilder jsonBuilder = new JsonBuilder();
        final Map<String, Object> lookupMap = new LinkedHashMap<>();
        final Map<String, Object> lookupFields = new LinkedHashMap<>();

        // 1. Add collection to join.
        lookupFields.put("from", rightCollectionName);

        // 2. Fields from the left need to be in let so they can be used in $match.
        final Map<String, String> letExpressions =
                leftColumns.values().stream()
                        .collect(
                                Collectors.toMap(
                                        DocumentDbSchemaColumn::getSqlName,
                                        column -> "$" + DocumentDbRules.getPath(column, false)));
        lookupFields.put("let", letExpressions);

        // 3. Add any stages from the right implementor. Convert the json strings
        // into objects so they can be added as a list to the lookup pipeline.
        final List<Map<String, Object>> stages = new ArrayList<>();
        final ObjectMapper mapper = JsonMapper.builder()
                .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
                .build();
        for (Pair<String, String> operations : rightImplementor.getList()) {
            final String stage = operations.right;
            final Map<String, Object> map = mapper.readValue(stage, new TypeReference<LinkedHashMap<String, Object>>() {
            });
            stages.add(map);
        }

        // 4. Determine the $match stage for the pipeline. This is the join condition.
        final JoinTranslator translator = new JoinTranslator(implementor.getRexBuilder(), leftColumns, rightColumns);
        stages.add(translator.translateMatch(getCondition()));

        // 5. Add all stages in order to the pipeline.
        lookupFields.put("pipeline", stages);

        // 6. Add the new field where the matches will be placed.
        lookupFields.put("as", rightMatches);

        lookupMap.put("$lookup", lookupFields);
        implementor.add(null, jsonBuilder.toJsonString(lookupMap));

        // Unwind the matched rows. Preserve null/empty arrays (unmatched rows) depending on join type.
        final UnwindOptions opts = new UnwindOptions();
        switch (getJoinType()) {
            case INNER:
                // Remove rows for which there were no matches.
                opts.preserveNullAndEmptyArrays(false);
                break;
            case LEFT:
                // Keep rows for which there were no matches.
                opts.preserveNullAndEmptyArrays(true);
                break;
            default:
                throw new IllegalArgumentException(SqlError.lookup(SqlError.UNSUPPORTED_JOIN_TYPE, getJoinType().name()));
        }
        implementor.add(null, String.valueOf(Aggregates.unwind("$" + rightMatches, opts)));
        LOGGER.debug("Created join stages of pipeline.");
        LOGGER.debug("Pipeline stages added: {}",
                implementor.getList().stream()
                        .map(c -> c.right)
                        .toArray());
    }