protected ILogicalOperator createBTreeIndexSearchPlan()

in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/BTreeAccessMethod.java [337:801]


    protected ILogicalOperator createBTreeIndexSearchPlan(List<Mutable<ILogicalOperator>> afterTopOpRefs,
            Mutable<ILogicalOperator> topOpRef, Mutable<ILogicalExpression> conditionRef,
            List<Mutable<ILogicalOperator>> assignBeforeTheOpRefs, OptimizableOperatorSubTree indexSubTree,
            OptimizableOperatorSubTree probeSubTree, Index chosenIndex, AccessMethodAnalysisContext analysisCtx,
            boolean retainInput, boolean retainMissing, boolean requiresBroadcast, IOptimizationContext context,
            LogicalVariable newMissingNullPlaceHolderForLOJ, IAlgebricksConstantValue leftOuterMissingValue,
            List<List<String>> chosenIndexKeyFieldNames, List<IAType> chosenIndexKeyFieldTypes,
            List<Integer> chosenIndexKeyFieldSourceIndicators) throws AlgebricksException {
        Dataset dataset = indexSubTree.getDataset();
        ARecordType recordType = indexSubTree.getRecordType();
        ARecordType metaRecordType = indexSubTree.getMetaRecordType();
        // we made sure indexSubTree has datasource scan
        AbstractDataSourceOperator dataSourceOp =
                (AbstractDataSourceOperator) indexSubTree.getDataSourceRef().getValue();
        List<Pair<Integer, Integer>> exprAndVarList = analysisCtx.getIndexExprsFromIndexExprsAndVars(chosenIndex);
        int numSecondaryKeys = analysisCtx.getNumberOfMatchedKeys(chosenIndex);

        // Whether the given plan is an index-only plan or not.
        Quadruple<Boolean, Boolean, Boolean, Boolean> indexOnlyPlanInfo = analysisCtx.getIndexOnlyPlanInfo();
        boolean isIndexOnlyPlan = indexOnlyPlanInfo.getFirst();

        // We only apply index-only plan for an internal dataset.
        boolean generateInstantTrylockResultFromIndexSearch = false;
        if (dataset.getDatasetType() == DatasetType.INTERNAL && isIndexOnlyPlan) {
            generateInstantTrylockResultFromIndexSearch = true;
        }

        // List of function expressions that will be replaced by the secondary-index search.
        // These func exprs will be removed from the select condition at the very end of this method.
        Set<ILogicalExpression> replacedFuncExprs = new HashSet<>();
        // Info on high and low keys for the BTree search predicate.
        ILogicalExpression[] lowKeyExprs = new ILogicalExpression[numSecondaryKeys];
        ILogicalExpression[] highKeyExprs = new ILogicalExpression[numSecondaryKeys];
        LimitType[] lowKeyLimits = new LimitType[numSecondaryKeys];
        LimitType[] highKeyLimits = new LimitType[numSecondaryKeys];
        boolean[] lowKeyInclusive = new boolean[numSecondaryKeys];
        boolean[] highKeyInclusive = new boolean[numSecondaryKeys];
        ILogicalExpression[] lowKeyConstAtRuntimeExpressions = new ILogicalExpression[numSecondaryKeys];
        ILogicalExpression[] highKeyConstantAtRuntimeExpressions = new ILogicalExpression[numSecondaryKeys];
        LogicalVariable[] lowKeyConstAtRuntimeExprVars = new LogicalVariable[numSecondaryKeys];
        LogicalVariable[] highKeyConstAtRuntimeExprVars = new LogicalVariable[numSecondaryKeys];

        /* TODO: For now we don't do any sophisticated analysis of the func exprs to come up with "the best" range
         * predicate. If we can't figure out how to integrate a certain funcExpr into the current predicate,
         * we just bail by setting this flag.*/
        boolean couldntFigureOut = false;
        boolean doneWithExprs = false;
        boolean isEqCondition = false;
        boolean anyRealTypeConvertedToIntegerType = false;
        BitSet setLowKeys = new BitSet(numSecondaryKeys);
        BitSet setHighKeys = new BitSet(numSecondaryKeys);
        // Go through the func exprs listed as optimizable by the chosen index,
        // and formulate a range predicate on the secondary-index keys.

        // Checks whether a type casting happened from a real (FLOAT, DOUBLE) value to an INT value
        // since we have a round issue when dealing with LT(<) OR GT(>) operator.
        for (Pair<Integer, Integer> exprIndex : exprAndVarList) {
            // Position of the field of matchedFuncExprs.get(exprIndex) in the chosen index's indexed exprs.
            IOptimizableFuncExpr optFuncExpr = analysisCtx.getMatchedFuncExpr(exprIndex.first);
            int keyPos = indexOf(optFuncExpr.getFieldName(0), optFuncExpr.getFieldSource(0), chosenIndexKeyFieldNames,
                    chosenIndexKeyFieldSourceIndicators);
            if (keyPos < 0 && optFuncExpr.getNumLogicalVars() > 1) {
                // If we are optimizing a join, the matching field may be the second field name.
                keyPos = indexOf(optFuncExpr.getFieldName(1), optFuncExpr.getFieldSource(1), chosenIndexKeyFieldNames,
                        chosenIndexKeyFieldSourceIndicators);
            }
            if (keyPos < 0) {
                throw CompilationException.create(ErrorCode.NO_INDEX_FIELD_NAME_FOR_GIVEN_FUNC_EXPR,
                        optFuncExpr.getFuncExpr().getSourceLocation());
            }
            // returnedSearchKeyExpr contains a pair of search expression.
            // The second expression will not be null only if we are creating an EQ search predicate
            // with a FLOAT or a DOUBLE constant that will be fed into an INTEGER index.
            // This is required because of type-casting. Refer to AccessMethodUtils.createSearchKeyExpr for details.
            IAType indexedFieldType = chosenIndexKeyFieldTypes.get(keyPos);
            Triple<ILogicalExpression, ILogicalExpression, Boolean> returnedSearchKeyExpr =
                    AccessMethodUtils.createSearchKeyExpr(chosenIndex, optFuncExpr, indexedFieldType, probeSubTree,
                            SEARCH_KEY_ROUNDING_FUNCTION_COMPUTER);
            ILogicalExpression searchKeyExpr = returnedSearchKeyExpr.first;
            ILogicalExpression searchKeyEQExpr = null;
            boolean realTypeConvertedToIntegerType = returnedSearchKeyExpr.third;
            anyRealTypeConvertedToIntegerType |= realTypeConvertedToIntegerType;

            LimitType limit = getLimitType(optFuncExpr, probeSubTree);
            if (limit == null) {
                return null;
            }

            if (limit == LimitType.EQUAL && returnedSearchKeyExpr.second != null) {
                // The given search predicate is EQ and
                // we have two type-casted values from FLOAT or DOUBLE to an INT constant.
                searchKeyEQExpr = returnedSearchKeyExpr.second;
            }

            // Deals with the non-enforced index case here.
            if (relaxLimitTypeToInclusive(chosenIndex, indexedFieldType, realTypeConvertedToIntegerType)) {
                if (limit == LimitType.HIGH_EXCLUSIVE) {
                    limit = LimitType.HIGH_INCLUSIVE;
                } else if (limit == LimitType.LOW_EXCLUSIVE) {
                    limit = LimitType.LOW_INCLUSIVE;
                }
            }

            if (searchKeyExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
                LogicalVariable constAtRuntimeExprVar = context.newVar();
                VariableReferenceExpression constAtRuntimeExprVarRef =
                        new VariableReferenceExpression(constAtRuntimeExprVar);
                constAtRuntimeExprVarRef.setSourceLocation(searchKeyExpr.getSourceLocation());

                if (limit == LimitType.LOW_INCLUSIVE || limit == LimitType.LOW_EXCLUSIVE || limit == LimitType.EQUAL) {
                    lowKeyConstAtRuntimeExpressions[keyPos] = searchKeyExpr;
                    lowKeyConstAtRuntimeExprVars[keyPos] = constAtRuntimeExprVar;
                }
                if (limit == LimitType.HIGH_INCLUSIVE || limit == LimitType.HIGH_EXCLUSIVE
                        || limit == LimitType.EQUAL) {
                    highKeyConstantAtRuntimeExpressions[keyPos] = searchKeyExpr;
                    highKeyConstAtRuntimeExprVars[keyPos] = constAtRuntimeExprVar;
                }
                searchKeyExpr = constAtRuntimeExprVarRef;
            }

            switch (limit) {
                case EQUAL: {
                    if (lowKeyLimits[keyPos] == null && highKeyLimits[keyPos] == null) {
                        lowKeyLimits[keyPos] = highKeyLimits[keyPos] = limit;
                        lowKeyInclusive[keyPos] = highKeyInclusive[keyPos] = true;
                        if (searchKeyEQExpr == null) {
                            // No type-casting was happened.
                            lowKeyExprs[keyPos] = highKeyExprs[keyPos] = searchKeyExpr;
                        } else {
                            // We have two type-casted FLOAT or DOUBLE values to be fed into an INT index.
                            // They contain the same value if their fraction value is 0.
                            // Refer to AccessMethodUtils.createSearchKeyExpr() for more details.
                            lowKeyExprs[keyPos] = searchKeyExpr;
                            highKeyExprs[keyPos] = searchKeyEQExpr;
                        }
                        setLowKeys.set(keyPos);
                        setHighKeys.set(keyPos);
                        isEqCondition = true;
                    } else {
                        // Has already been set to the identical values.
                        // When optimizing join we may encounter the same optimizable expression twice
                        // (once from analyzing each side of the join)
                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == true
                                && lowKeyExprs[keyPos].equals(searchKeyExpr) && highKeyLimits[keyPos] == limit
                                && highKeyInclusive[keyPos] == true && highKeyExprs[keyPos].equals(searchKeyExpr)) {
                            isEqCondition = true;
                            break;
                        }
                        couldntFigureOut = true;
                    }
                    // TODO: For now don't consider prefix searches.
                    // If high and low keys are set, we exit for now.
                    if (setLowKeys.cardinality() == numSecondaryKeys && setHighKeys.cardinality() == numSecondaryKeys) {
                        doneWithExprs = true;
                    }
                    break;
                }
                case HIGH_EXCLUSIVE: {
                    if (highKeyLimits[keyPos] == null || (highKeyLimits[keyPos] != null && highKeyInclusive[keyPos])) {
                        highKeyLimits[keyPos] = limit;
                        highKeyExprs[keyPos] = searchKeyExpr;
                        highKeyInclusive[keyPos] = false;
                    } else {
                        // Has already been set to the identical values. When optimizing join we may encounter the
                        // same optimizable expression twice
                        // (once from analyzing each side of the join)
                        if (highKeyLimits[keyPos] == limit && highKeyInclusive[keyPos] == false
                                && highKeyExprs[keyPos].equals(searchKeyExpr)) {
                            break;
                        }
                        couldntFigureOut = true;
                        doneWithExprs = true;
                    }
                    break;
                }
                case HIGH_INCLUSIVE: {
                    if (highKeyLimits[keyPos] == null) {
                        highKeyLimits[keyPos] = limit;
                        highKeyExprs[keyPos] = searchKeyExpr;
                        highKeyInclusive[keyPos] = true;
                    } else {
                        // Has already been set to the identical values. When optimizing join we may encounter the
                        // same optimizable expression twice
                        // (once from analyzing each side of the join)
                        if (highKeyLimits[keyPos] == limit && highKeyInclusive[keyPos] == true
                                && highKeyExprs[keyPos].equals(searchKeyExpr)) {
                            break;
                        }
                        couldntFigureOut = true;
                        doneWithExprs = true;
                    }
                    break;
                }
                case LOW_EXCLUSIVE: {
                    if (lowKeyLimits[keyPos] == null || (lowKeyLimits[keyPos] != null && lowKeyInclusive[keyPos])) {
                        lowKeyLimits[keyPos] = limit;
                        lowKeyExprs[keyPos] = searchKeyExpr;
                        lowKeyInclusive[keyPos] = false;
                    } else {
                        // Has already been set to the identical values. When optimizing join we may encounter the
                        // same optimizable expression twice
                        // (once from analyzing each side of the join)
                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == false
                                && lowKeyExprs[keyPos].equals(searchKeyExpr)) {
                            break;
                        }
                        couldntFigureOut = true;
                        doneWithExprs = true;
                    }
                    break;
                }
                case LOW_INCLUSIVE: {
                    if (lowKeyLimits[keyPos] == null) {
                        lowKeyLimits[keyPos] = limit;
                        lowKeyExprs[keyPos] = searchKeyExpr;
                        lowKeyInclusive[keyPos] = true;
                    } else {
                        // Has already been set to the identical values. When optimizing join we may encounter the
                        // same optimizable expression twice
                        // (once from analyzing each side of the join)
                        if (lowKeyLimits[keyPos] == limit && lowKeyInclusive[keyPos] == true
                                && lowKeyExprs[keyPos].equals(searchKeyExpr)) {
                            break;
                        }
                        couldntFigureOut = true;
                        doneWithExprs = true;
                    }
                    break;
                }
                default: {
                    throw new IllegalStateException();
                }
            }
            if (!couldntFigureOut) {
                // Remember to remove this funcExpr later.
                replacedFuncExprs.add(analysisCtx.getMatchedFuncExpr(exprIndex.first).getFuncExpr());
            }
            if (doneWithExprs) {
                break;
            }
        }
        if (couldntFigureOut) {
            return null;
        }

        // if we have composite search keys, we should always need a post-processing to ensure the correctness
        // of search results because of the way a BTree is searched, unless only the last key is a range search.
        // During a BTree search, we iterate from the start index
        // (based on the low keys) to the end index (based on the high keys). During the iteration,
        // we can encounter a lot of false positives
        boolean primaryIndexPostProccessingIsNeeded = false;
        for (int i = 0; i < numSecondaryKeys - 1; i++) {
            if (!LimitType.EQUAL.equals(lowKeyLimits[i]) || !LimitType.EQUAL.equals(highKeyLimits[i])) {
                primaryIndexPostProccessingIsNeeded = true;
            }
        }

        // determine cases when prefix search could be applied
        for (int i = 1; i < lowKeyExprs.length; i++) {
            if (lowKeyLimits[0] == null && lowKeyLimits[i] != null || lowKeyLimits[0] != null && lowKeyLimits[i] == null
                    || highKeyLimits[0] == null && highKeyLimits[i] != null
                    || highKeyLimits[0] != null && highKeyLimits[i] == null) {
                numSecondaryKeys = i;
                primaryIndexPostProccessingIsNeeded = true;
                break;
            }
        }

        if (primaryIndexPostProccessingIsNeeded) {
            Arrays.fill(lowKeyInclusive, true);
            Arrays.fill(highKeyInclusive, true);
        }

        if (lowKeyLimits[0] == null) {
            lowKeyInclusive[0] = true;
        }
        if (highKeyLimits[0] == null) {
            highKeyInclusive[0] = true;
        }

        // Here we generate vars and funcs for assigning the secondary-index keys to be fed into
        // the secondary-index search.
        // List of variables for the assign.
        ArrayList<LogicalVariable> keyVarList = new ArrayList<>();
        // List of variables and expressions for the assign.
        ArrayList<LogicalVariable> assignKeyVarList = new ArrayList<>();
        ArrayList<Mutable<ILogicalExpression>> assignKeyExprList = new ArrayList<>();
        int numLowKeys = createKeyVarsAndExprs(numSecondaryKeys, lowKeyLimits, lowKeyExprs, assignKeyVarList,
                assignKeyExprList, keyVarList, context, lowKeyConstAtRuntimeExpressions, lowKeyConstAtRuntimeExprVars);
        int numHighKeys = createKeyVarsAndExprs(numSecondaryKeys, highKeyLimits, highKeyExprs, assignKeyVarList,
                assignKeyExprList, keyVarList, context, highKeyConstantAtRuntimeExpressions,
                highKeyConstAtRuntimeExprVars);

        BTreeJobGenParams jobGenParams =
                new BTreeJobGenParams(chosenIndex.getIndexName(), IndexType.BTREE, dataset.getDatabaseName(),
                        dataset.getDataverseName(), dataset.getDatasetName(), retainInput, requiresBroadcast);
        jobGenParams
                .setLowKeyInclusive(lowKeyInclusive[primaryIndexPostProccessingIsNeeded ? 0 : numSecondaryKeys - 1]);
        jobGenParams
                .setHighKeyInclusive(highKeyInclusive[primaryIndexPostProccessingIsNeeded ? 0 : numSecondaryKeys - 1]);
        jobGenParams.setIsEqCondition(isEqCondition);
        jobGenParams.setLowKeyVarList(keyVarList, 0, numLowKeys);
        jobGenParams.setHighKeyVarList(keyVarList, numLowKeys, numHighKeys);

        ILogicalOperator inputOp;
        if (!assignKeyVarList.isEmpty()) {
            // Assign operator that sets the constant secondary-index search-key fields if necessary.
            AssignOperator assignSearchKeys = new AssignOperator(assignKeyVarList, assignKeyExprList);
            assignSearchKeys.setSourceLocation(dataSourceOp.getSourceLocation());
            if (probeSubTree == null) {
                // We are optimizing a selection query.
                // Input to this assign is the EmptyTupleSource (which the dataSourceScan also must have had as input).
                assignSearchKeys.getInputs().add(new MutableObject<>(
                        OperatorManipulationUtil.deepCopy(dataSourceOp.getInputs().get(0).getValue())));
                assignSearchKeys.setExecutionMode(dataSourceOp.getExecutionMode());
            } else {
                // We are optimizing a join, place the assign op top of the probe subtree.
                assignSearchKeys.getInputs().add(probeSubTree.getRootRef());
                assignSearchKeys.setExecutionMode(probeSubTree.getRootRef().getValue().getExecutionMode());
            }
            context.computeAndSetTypeEnvironmentForOperator(assignSearchKeys);
            inputOp = assignSearchKeys;
        } else if (probeSubTree == null) {
            //nonpure case
            //Make sure that the nonpure function is unpartitioned
            ILogicalOperator checkOp = dataSourceOp.getInputs().get(0).getValue();
            while (checkOp.getExecutionMode() != ExecutionMode.UNPARTITIONED) {
                if (checkOp.getInputs().size() == 1) {
                    checkOp = checkOp.getInputs().get(0).getValue();
                } else {
                    return null;
                }
            }
            inputOp = dataSourceOp.getInputs().get(0).getValue();
        } else {
            // All index search keys are variables.
            inputOp = probeSubTree.getRoot();
        }

        // Creates an unnest-map for the secondary index search.
        // The result: SK, PK, [Optional - the result of an instantTrylock on PK]
        ILogicalOperator secondaryIndexUnnestOp = AccessMethodUtils.createSecondaryIndexUnnestMap(dataset, recordType,
                metaRecordType, chosenIndex, inputOp, jobGenParams, context, retainInput, retainMissing,
                generateInstantTrylockResultFromIndexSearch, leftOuterMissingValue);

        // Generate the rest of the upstream plan which feeds the search results into the primary index.
        ILogicalOperator indexSearchOp = null;

        boolean isPrimaryIndex = chosenIndex.isPrimaryIndex();
        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
            // External dataset
            UnnestMapOperator externalDataAccessOp =
                    AccessMethodUtils.createExternalDataLookupUnnestMap(dataSourceOp, dataset, recordType,
                            metaRecordType, secondaryIndexUnnestOp, context, chosenIndex, retainInput, retainMissing);
            indexSubTree.getDataSourceRef().setValue(externalDataAccessOp);
            return externalDataAccessOp;
        } else if (!isPrimaryIndex) {
            indexSearchOp = AccessMethodUtils.createRestOfIndexSearchPlan(afterTopOpRefs, topOpRef, conditionRef,
                    assignBeforeTheOpRefs, dataSourceOp, dataset, recordType, metaRecordType, secondaryIndexUnnestOp,
                    context, true, retainInput, retainMissing, false, chosenIndex, analysisCtx, indexSubTree,
                    probeSubTree, newMissingNullPlaceHolderForLOJ, leftOuterMissingValue,
                    anyRealTypeConvertedToIntegerType);

            // Replaces the datasource scan with the new plan rooted at
            // Get dataSourceRef operator -
            // 1) unnest-map (PK, record) for a non-index only plan
            // 2) unnest-map (SK, PK) for an index-only plan
            if (isIndexOnlyPlan) {
                // Index-only plan
                ILogicalOperator dataSourceRefOp =
                        AccessMethodUtils.findDataSourceFromIndexUtilizationPlan(indexSearchOp);

                if (dataSourceRefOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP
                        || dataSourceRefOp.getOperatorTag() == LogicalOperatorTag.LEFT_OUTER_UNNEST_MAP) {
                    // Adds equivalence classes --- one equivalent class between a primary key
                    // variable and a record field-access expression.
                    EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(indexSearchOp,
                            dataSourceOp.getVariables(), recordType, metaRecordType, dataset, context);
                }
            } else {
                // Non-indexonly plan cases
                // Adds equivalence classes --- one equivalent class between a primary key
                // variable and a record field-access expression.
                EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(indexSearchOp,
                        dataSourceOp.getVariables(), recordType, metaRecordType, dataset, context);
            }
        } else {
            // Primary index search case
            List<Object> primaryIndexOutputTypes = new ArrayList<>();
            AccessMethodUtils.appendPrimaryIndexTypes(dataset, recordType, metaRecordType, primaryIndexOutputTypes);
            List<LogicalVariable> scanVariables = dataSourceOp.getVariables();

            // Checks whether the primary index search can replace the given SELECT condition.
            // If so, the condition will be set to null and eventually the SELECT operator will be removed.
            // If not, we create a new condition based on remaining ones.
            if (!primaryIndexPostProccessingIsNeeded) {
                List<Mutable<ILogicalExpression>> remainingFuncExprs = new ArrayList<>();
                try {
                    getNewConditionExprs(conditionRef, replacedFuncExprs, remainingFuncExprs);
                } catch (CompilationException e) {
                    return null;
                }
                // Generates the new condition.
                if (!remainingFuncExprs.isEmpty()) {
                    ILogicalExpression pulledCond = createSelectCondition(remainingFuncExprs);
                    conditionRef.setValue(pulledCond);
                } else {
                    conditionRef.setValue(null);
                }
            }

            // Checks whether LEFT_OUTER_UNNESTMAP operator is required.
            boolean leftOuterUnnestMapRequired = false;
            if (retainMissing && retainInput) {
                leftOuterUnnestMapRequired = true;
            } else {
                leftOuterUnnestMapRequired = false;
            }
            AbstractUnnestMapOperator unnestMapOp;
            if (conditionRef.getValue() != null) {
                // The job gen parameters are transferred to the actual job gen
                // via the UnnestMapOperator's function arguments.
                List<Mutable<ILogicalExpression>> primaryIndexFuncArgs = new ArrayList<>();
                jobGenParams.writeToFuncArgs(primaryIndexFuncArgs);
                // An index search is expressed as an unnest-map over an index-search function.
                IFunctionInfo primaryIndexSearch = FunctionUtil.getFunctionInfo(BuiltinFunctions.INDEX_SEARCH);
                UnnestingFunctionCallExpression primaryIndexSearchFunc =
                        new UnnestingFunctionCallExpression(primaryIndexSearch, primaryIndexFuncArgs);
                primaryIndexSearchFunc.setSourceLocation(dataSourceOp.getSourceLocation());
                primaryIndexSearchFunc.setReturnsUniqueValues(true);
                if (!leftOuterUnnestMapRequired) {
                    unnestMapOp = new UnnestMapOperator(scanVariables, new MutableObject<>(primaryIndexSearchFunc),
                            primaryIndexOutputTypes, retainInput);
                } else {
                    unnestMapOp =
                            new LeftOuterUnnestMapOperator(scanVariables, new MutableObject<>(primaryIndexSearchFunc),
                                    primaryIndexOutputTypes, leftOuterMissingValue);
                }
            } else {
                if (!leftOuterUnnestMapRequired) {
                    unnestMapOp = new UnnestMapOperator(scanVariables,
                            ((UnnestMapOperator) secondaryIndexUnnestOp).getExpressionRef(), primaryIndexOutputTypes,
                            retainInput);
                } else {
                    unnestMapOp = new LeftOuterUnnestMapOperator(scanVariables,
                            ((LeftOuterUnnestMapOperator) secondaryIndexUnnestOp).getExpressionRef(),
                            primaryIndexOutputTypes, leftOuterMissingValue);
                }
            }
            unnestMapOp.setExecutionMode(ExecutionMode.PARTITIONED);
            unnestMapOp.setSourceLocation(dataSourceOp.getSourceLocation());
            unnestMapOp.getInputs().add(new MutableObject<>(inputOp));
            context.computeAndSetTypeEnvironmentForOperator(unnestMapOp);
            indexSearchOp = unnestMapOp;

            // Adds equivalence classes --- one equivalent class between a primary key
            // variable and a record field-access expression.
            EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(indexSearchOp, scanVariables, recordType,
                    metaRecordType, dataset, context);
        }

        OperatorManipulationUtil.copyCardCostAnnotations(dataSourceOp, indexSearchOp);
        return indexSearchOp;
    }