public boolean rewritePost()

in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java [116:689]


    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
            throws AlgebricksException {
        AbstractLogicalOperator op0 = (AbstractLogicalOperator) opRef.getValue();
        if (op0.getOperatorTag() != LogicalOperatorTag.DELEGATE_OPERATOR
                && op0.getOperatorTag() != LogicalOperatorTag.SINK) {
            return false;
        }
        if (op0.getOperatorTag() == LogicalOperatorTag.DELEGATE_OPERATOR) {
            DelegateOperator eOp = (DelegateOperator) op0;
            if (!(eOp.getDelegate() instanceof CommitOperator)) {
                return false;
            }
        }
        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op0.getInputs().get(0).getValue();
        if (op1.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT) {
            return false;
        }
        /** find the record variable */
        InsertDeleteUpsertOperator primaryIndexModificationOp =
                (InsertDeleteUpsertOperator) op0.getInputs().get(0).getValue();
        boolean isBulkload = primaryIndexModificationOp.isBulkload();
        ILogicalExpression newRecordExpr = primaryIndexModificationOp.getPayloadExpression().getValue();
        List<Mutable<ILogicalExpression>> newMetaExprs =
                primaryIndexModificationOp.getAdditionalNonFilteringExpressions();
        LogicalVariable newRecordVar;
        LogicalVariable newMetaVar = null;
        sourceLoc = primaryIndexModificationOp.getSourceLocation();
        this.context = context;

        /**
         * inputOp is the assign operator which extracts primary keys from the input
         * variables (record or meta)
         */
        AbstractLogicalOperator inputOp =
                (AbstractLogicalOperator) primaryIndexModificationOp.getInputs().get(0).getValue();
        newRecordVar = getRecordVar(inputOp, newRecordExpr, 0);
        if (newMetaExprs != null && !newMetaExprs.isEmpty()) {
            if (newMetaExprs.size() > 1) {
                throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc,
                        "Number of meta records can't be more than 1. Number of meta records found = "
                                + newMetaExprs.size());
            }
            newMetaVar = getRecordVar(inputOp, newMetaExprs.get(0).getValue(), 1);
        }

        /*
         * At this point, we have the record variable and the insert/delete/upsert operator
         * Note: We have two operators:
         * 1. An InsertDeleteOperator (primary)
         * 2. An IndexInsertDeleteOperator (secondary)
         * The current primaryIndexModificationOp is of the first type
         */

        DataSource datasetSource = (DataSource) primaryIndexModificationOp.getDataSource();
        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
        DataverseName dataverseName = datasetSource.getId().getDataverseName();
        String database = datasetSource.getId().getDatabaseName();
        String datasetName = datasetSource.getId().getDatasourceName();
        Dataset dataset = mp.findDataset(database, dataverseName, datasetName);
        if (dataset == null) {
            throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, sourceLoc, datasetName,
                    MetadataUtil.dataverseName(database, dataverseName, mp.isUsingDatabase()));
        }
        if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
            return false;
        }

        // Create operators for secondary index insert / delete.
        String itemTypeName = dataset.getItemTypeName();
        IAType itemType =
                mp.findType(dataset.getItemTypeDatabaseName(), dataset.getItemTypeDataverseName(), itemTypeName);
        if (itemType.getTypeTag() != ATypeTag.OBJECT) {
            throw new CompilationException(ErrorCode.COMPILATION_ERROR, sourceLoc, "Only record types can be indexed.");
        }
        ARecordType recType = (ARecordType) itemType;
        // meta type
        ARecordType metaType = null;
        if (dataset.hasMetaPart()) {
            metaType = (ARecordType) mp.findType(dataset.getMetaItemTypeDatabaseName(),
                    dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
        }
        recType = (ARecordType) mp.findTypeForDatasetWithoutType(recType, metaType, dataset);

        List<Index> indexes =
                mp.getDatasetIndexes(dataset.getDatabaseName(), dataset.getDataverseName(), dataset.getDatasetName());
        Stream<Index> indexStream = indexes.stream();
        indexStream = indexStream.filter(index -> index.getIndexType() != IndexType.SAMPLE);
        if (primaryIndexModificationOp.getOperation() == Kind.INSERT && !primaryIndexModificationOp.isBulkload()) {
            // for insert, primary key index is handled together when primary index
            indexStream = indexStream.filter(index -> !index.isPrimaryKeyIndex());
        }
        indexes = indexStream.collect(Collectors.toList());

        // Put an n-gram or a keyword index in the later stage of index-update,
        // since TokenizeOperator needs to be involved.
        Collections.sort(indexes, (o1, o2) -> o1.getIndexType().ordinal() - o2.getIndexType().ordinal());

        // Set the top operator pointer to the primary IndexInsertDeleteOperator
        ILogicalOperator currentTop = primaryIndexModificationOp;

        // At this point, we have the data type info, and the indexes info as well
        int secondaryIndexTotalCnt = indexes.size() - 1;
        if (secondaryIndexTotalCnt > 0) {
            op0.getInputs().clear();
        } else {
            return false;
        }
        // Initialize inputs to the SINK operator Op0 (The SINK) is now without input
        // Prepare filtering field information (This is the filter created using the "filter with" key word in the
        // create dataset ddl)
        List<String> filteringFields = ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
        List<LogicalVariable> filteringVars;
        List<Mutable<ILogicalExpression>> filteringExpressions = null;

        if (filteringFields != null) {
            // The filter field var already exists. we can simply get it from the insert op
            filteringVars = new ArrayList<>();
            filteringExpressions = new ArrayList<>();
            for (Mutable<ILogicalExpression> filteringExpression : primaryIndexModificationOp
                    .getAdditionalFilteringExpressions()) {
                filteringExpression.getValue().getUsedVariables(filteringVars);
                for (LogicalVariable var : filteringVars) {
                    VariableReferenceExpression varRef = new VariableReferenceExpression(var);
                    varRef.setSourceLocation(filteringExpression.getValue().getSourceLocation());
                    filteringExpressions.add(new MutableObject<ILogicalExpression>(varRef));
                }
            }
        }

        // Replicate Operator is applied only when doing the bulk-load.
        ReplicateOperator replicateOp = null;
        if (secondaryIndexTotalCnt > 1 && isBulkload) {
            // Split the logical plan into "each secondary index update branch"
            // to replicate each <PK,OBJECT> pair.
            replicateOp = new ReplicateOperator(secondaryIndexTotalCnt);
            replicateOp.setSourceLocation(sourceLoc);
            replicateOp.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
            replicateOp.setExecutionMode(ExecutionMode.PARTITIONED);
            context.computeAndSetTypeEnvironmentForOperator(replicateOp);
            currentTop = replicateOp;
        }

        /*
         * The two maps are used to store variables to which [casted] field access is assigned.
         * One for the beforeOp record and the other for the new record.
         * There are two uses for these maps:
         * 1. used for shared fields in indexes with overlapping keys.
         * 2. used for setting variables of secondary keys for each secondary index operator.
         */
        Map<IndexFieldId, LogicalVariable> fieldVarsForBeforeOperation = new HashMap<>();
        Map<IndexFieldId, LogicalVariable> fieldVarsForNewRecord = new HashMap<>();
        /*
         * if the index is enforcing field types (For open indexes), We add a cast
         * operator to ensure type safety
         */
        if (primaryIndexModificationOp.getOperation() == Kind.INSERT
                || primaryIndexModificationOp.getOperation() == Kind.UPSERT
                /* Actually, delete should not be here but it is now until issue
                 * https://issues.apache.org/jira/browse/ASTERIXDB-1507
                 * is solved
                 */
                || primaryIndexModificationOp.getOperation() == Kind.DELETE) {
            injectFieldAccessesForIndexes(dataset, indexes, fieldVarsForNewRecord, recType, metaType, newRecordVar,
                    newMetaVar, primaryIndexModificationOp, false);
            if (replicateOp != null) {
                context.computeAndSetTypeEnvironmentForOperator(replicateOp);
            }
        }
        if (primaryIndexModificationOp.getOperation() == Kind.UPSERT
        /* Actually, delete should be here but it is not until issue
         * https://issues.apache.org/jira/browse/ASTERIXDB-1507
         * is solved
         */) {
            List<LogicalVariable> beforeOpMetaVars = primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
            LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
            currentTop = injectFieldAccessesForIndexes(dataset, indexes, fieldVarsForBeforeOperation, recType, metaType,
                    primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, currentTop, true);
        }

        // Add the appropriate SIDX maintenance operations.
        for (Index index : indexes) {
            if (!index.isSecondaryIndex()) {
                continue;
            }

            // Get the secondary fields names and types
            List<List<String>> secondaryKeyFields = null;
            List<IAType> secondaryKeyTypes = null;
            List<Integer> secondaryKeySources = null;
            switch (Index.IndexCategory.of(index.getIndexType())) {
                case VALUE:
                    Index.ValueIndexDetails valueIndexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
                    secondaryKeyFields = valueIndexDetails.getKeyFieldNames();
                    secondaryKeyTypes = valueIndexDetails.getKeyFieldTypes();
                    secondaryKeySources = valueIndexDetails.getKeyFieldSourceIndicators();
                    break;
                case TEXT:
                    Index.TextIndexDetails textIndexDetails = (Index.TextIndexDetails) index.getIndexDetails();
                    secondaryKeyFields = textIndexDetails.getKeyFieldNames();
                    secondaryKeyTypes = textIndexDetails.getKeyFieldTypes();
                    secondaryKeySources = textIndexDetails.getKeyFieldSourceIndicators();
                    break;
                case ARRAY:
                    // These details are handled separately for array indexes.
                    break;
                default:
                    continue;
            }

            // Set our key variables and expressions for non-array indexes. Our secondary keys for array indexes will
            // always be an empty list.
            List<LogicalVariable> secondaryKeyVars = new ArrayList<>();
            List<LogicalVariable> beforeOpSecondaryKeyVars = new ArrayList<>();
            List<Mutable<ILogicalExpression>> secondaryExpressions = new ArrayList<>();
            List<Mutable<ILogicalExpression>> beforeOpSecondaryExpressions = new ArrayList<>();
            ILogicalOperator replicateOutput;
            if (!index.getIndexType().equals(IndexType.ARRAY)) {
                for (int i = 0; i < secondaryKeyFields.size(); i++) {
                    IAType skType = secondaryKeyTypes.get(i);
                    Integer skSrc = secondaryKeySources.get(i);
                    List<String> skName = secondaryKeyFields.get(i);
                    ARecordType sourceType = dataset.hasMetaPart()
                            ? skSrc.intValue() == Index.RECORD_INDICATOR ? recType : metaType : recType;
                    IndexFieldId indexFieldId = createIndexFieldId(index, skName, skType, skSrc, sourceType, sourceLoc);
                    LogicalVariable skVar = fieldVarsForNewRecord.get(indexFieldId);
                    secondaryKeyVars.add(skVar);
                    VariableReferenceExpression skVarRef = new VariableReferenceExpression(skVar);
                    skVarRef.setSourceLocation(sourceLoc);
                    secondaryExpressions.add(new MutableObject<>(skVarRef));
                    if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                        LogicalVariable beforeKeyVar = fieldVarsForBeforeOperation.get(indexFieldId);
                        beforeOpSecondaryKeyVars.add(beforeKeyVar);
                        VariableReferenceExpression varRef = new VariableReferenceExpression(beforeKeyVar);
                        varRef.setSourceLocation(sourceLoc);
                        beforeOpSecondaryExpressions.add(new MutableObject<>(varRef));
                    }
                }
            }

            IndexInsertDeleteUpsertOperator indexUpdate;
            if (index.getIndexType() != IndexType.RTREE) {
                // B-Tree, inverted index, array index
                // Create an expression per key
                Mutable<ILogicalExpression> filterExpression =
                        createFilterExpression(index, secondaryKeyVars, context.getOutputTypeEnvironment(currentTop),
                                index.getIndexDetails().isOverridingKeyFieldTypes());
                Mutable<ILogicalExpression> beforeOpFilterExpression = null;
                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                    beforeOpFilterExpression = createFilterExpression(index, beforeOpSecondaryKeyVars,
                            context.getOutputTypeEnvironment(currentTop),
                            index.getIndexDetails().isOverridingKeyFieldTypes());
                }
                DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);

                // Introduce the TokenizeOperator only when doing bulk-load,
                // and index type is keyword or n-gram.
                if (index.getIndexType() != IndexType.BTREE && index.getIndexType() != IndexType.ARRAY
                        && primaryIndexModificationOp.isBulkload()) {
                    // Note: Bulk load case, we don't need to take care of it for upsert operation
                    // Check whether the index is length-partitioned or not.
                    // If partitioned, [input variables to TokenizeOperator,
                    // token, number of token] pairs will be generated and
                    // fed into the IndexInsertDeleteOperator.
                    // If not, [input variables, token] pairs will be generated
                    // and fed into the IndexInsertDeleteOperator.
                    // Input variables are passed since TokenizeOperator is not an
                    // filtering operator.
                    boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
                            || index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;

                    // Create a new logical variable - token
                    List<LogicalVariable> tokenizeKeyVars = new ArrayList<>();
                    List<Mutable<ILogicalExpression>> tokenizeKeyExprs = new ArrayList<>();
                    LogicalVariable tokenVar = context.newVar();
                    tokenizeKeyVars.add(tokenVar);
                    VariableReferenceExpression tokenVarRef = new VariableReferenceExpression(tokenVar);
                    tokenVarRef.setSourceLocation(sourceLoc);
                    tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(tokenVarRef));

                    // Check the field type of the secondary key.
                    IAType secondaryKeyType;
                    Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index,
                            secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType);
                    secondaryKeyType = keyPairType.first;

                    List<Object> varTypes = new ArrayList<>();
                    varTypes.add(NonTaggedFormatUtil.getTokenType(secondaryKeyType));

                    // If the index is a length-partitioned, then create
                    // additional variable - number of token.
                    // We use a special type for the length-partitioned index.
                    // The type is short, and this does not contain type info.
                    if (isPartitioned) {
                        LogicalVariable lengthVar = context.newVar();
                        tokenizeKeyVars.add(lengthVar);
                        VariableReferenceExpression lengthVarRef = new VariableReferenceExpression(lengthVar);
                        lengthVarRef.setSourceLocation(sourceLoc);
                        tokenizeKeyExprs.add(new MutableObject<ILogicalExpression>(lengthVarRef));
                        varTypes.add(BuiltinType.SHORTWITHOUTTYPEINFO);
                    }

                    // TokenizeOperator to tokenize [SK, PK] pairs
                    TokenizeOperator tokenUpdate = new TokenizeOperator(dataSourceIndex,
                            OperatorManipulationUtil
                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
                            secondaryExpressions, tokenizeKeyVars,
                            filterExpression != null
                                    ? new MutableObject<>(filterExpression.getValue().cloneExpression()) : null,
                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
                            isPartitioned, varTypes);
                    tokenUpdate.setSourceLocation(sourceLoc);
                    tokenUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                    context.computeAndSetTypeEnvironmentForOperator(tokenUpdate);
                    replicateOutput = tokenUpdate;
                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                            OperatorManipulationUtil
                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
                            tokenizeKeyExprs, filterExpression, beforeOpFilterExpression,
                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                    indexUpdate.setSourceLocation(sourceLoc);
                    indexUpdate.setAdditionalFilteringExpressions(
                            OperatorManipulationUtil.cloneExpressions(filteringExpressions));
                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(tokenUpdate));
                } else {
                    // When TokenizeOperator is not needed
                    indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                            OperatorManipulationUtil
                                    .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
                            secondaryExpressions, filterExpression, beforeOpFilterExpression,
                            primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
                            primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                    : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                    indexUpdate.setSourceLocation(sourceLoc);
                    indexUpdate.setAdditionalFilteringExpressions(
                            OperatorManipulationUtil.cloneExpressions(filteringExpressions));
                    replicateOutput = indexUpdate;
                    // We add the necessary expressions for upsert
                    if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                        indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                        if (filteringFields != null) {
                            VariableReferenceExpression varRef =
                                    new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar());
                            varRef.setSourceLocation(sourceLoc);
                            indexUpdate.setBeforeOpAdditionalFilteringExpression(
                                    new MutableObject<ILogicalExpression>(varRef));
                        }
                    }
                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));

                    // For array indexes we have no secondary keys to reference. We must add separate branches to
                    // first extract our keys.
                    if (index.getIndexType() == IndexType.ARRAY && !isBulkload) {
                        NestedTupleSourceOperator unnestSourceOp =
                                new NestedTupleSourceOperator(new MutableObject<>(indexUpdate));
                        unnestSourceOp.setSourceLocation(sourceLoc);
                        context.computeAndSetTypeEnvironmentForOperator(unnestSourceOp);
                        UnnestBranchCreator unnestSIDXBranch = buildUnnestBranch(unnestSourceOp, index, newRecordVar,
                                newMetaVar, recType, metaType, dataset.hasMetaPart());
                        unnestSIDXBranch.applyProjectOnly();

                        // If there exists a filter expression, add it to the top of our nested plan.
                        filterExpression = (primaryIndexModificationOp.getOperation() == Kind.UPSERT) ? null
                                : createAnyUnknownFilterExpression(unnestSIDXBranch.lastFieldVars,
                                        context.getOutputTypeEnvironment(unnestSIDXBranch.currentTop),
                                        index.getIndexDetails().isOverridingKeyFieldTypes());
                        if (filterExpression != null) {
                            unnestSIDXBranch.applyFilteringExpression(filterExpression);
                        }

                        // Finalize our nested plan.
                        ILogicalPlan unnestPlan = unnestSIDXBranch.buildBranch();
                        indexUpdate.getNestedPlans().add(unnestPlan);

                        // If we have an UPSERT, then create and add a branch to extract our old keys as well.
                        if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                            NestedTupleSourceOperator unnestBeforeSourceOp =
                                    new NestedTupleSourceOperator(new MutableObject<>(indexUpdate));
                            unnestBeforeSourceOp.setSourceLocation(sourceLoc);
                            context.computeAndSetTypeEnvironmentForOperator(unnestBeforeSourceOp);

                            List<LogicalVariable> beforeOpMetaVars =
                                    primaryIndexModificationOp.getBeforeOpAdditionalNonFilteringVars();
                            LogicalVariable beforeOpMetaVar = beforeOpMetaVars == null ? null : beforeOpMetaVars.get(0);
                            UnnestBranchCreator unnestBeforeSIDXBranch = buildUnnestBranch(unnestBeforeSourceOp, index,
                                    primaryIndexModificationOp.getBeforeOpRecordVar(), beforeOpMetaVar, recType,
                                    metaType, dataset.hasMetaPart());
                            unnestBeforeSIDXBranch.applyProjectOnly();
                            indexUpdate.getNestedPlans().add(unnestBeforeSIDXBranch.buildBranch());
                        }
                    } else if (index.getIndexType() == IndexType.ARRAY && isBulkload) {
                        // If we have a bulk load, we must sort the entire input by <SK, PK>. Do not use any
                        // nested plans here.
                        UnnestBranchCreator unnestSIDXBranch = buildUnnestBranch(currentTop, index, newRecordVar,
                                newMetaVar, recType, metaType, dataset.hasMetaPart());
                        unnestSIDXBranch.applyProjectDistinct(primaryIndexModificationOp.getPrimaryKeyExpressions(),
                                primaryIndexModificationOp.getAdditionalFilteringExpressions());
                        indexUpdate.getInputs().clear();
                        introduceNewOp(unnestSIDXBranch.currentTop, indexUpdate, true);

                        // Update the secondary expressions of our index.
                        secondaryExpressions = new ArrayList<>();
                        for (LogicalVariable var : unnestSIDXBranch.lastFieldVars) {
                            secondaryExpressions.add(new MutableObject<>(new VariableReferenceExpression(var)));
                        }
                        indexUpdate.setSecondaryKeyExprs(secondaryExpressions);

                        // Update the filter expression to include these new keys.
                        filterExpression = createAnyUnknownFilterExpression(unnestSIDXBranch.lastFieldVars,
                                context.getOutputTypeEnvironment(unnestSIDXBranch.currentTop),
                                index.getIndexDetails().isOverridingKeyFieldTypes());
                        indexUpdate.setFilterExpression(filterExpression);

                        if (replicateOp != null) {
                            // If we have a replicate, then update the replicate operator to include this branch.
                            replicateOp.getOutputs().add(new MutableObject<>(unnestSIDXBranch.currentBottom));
                            op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
                            continue;
                        }
                    }
                }
            } else {
                // Get type, dimensions and number of keys
                Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(index, secondaryKeyTypes.get(0),
                        secondaryKeyFields.get(0), recType);
                IAType spatialType = keyPairType.first;
                boolean isPointMBR =
                        spatialType.getTypeTag() == ATypeTag.POINT || spatialType.getTypeTag() == ATypeTag.POINT3D;
                int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
                int numKeys = (isPointMBR && isBulkload) ? dimension : dimension * 2;
                // Get variables and expressions
                List<LogicalVariable> keyVarList = new ArrayList<>();
                List<Mutable<ILogicalExpression>> keyExprList = new ArrayList<>();
                for (int i = 0; i < numKeys; i++) {
                    LogicalVariable keyVar = context.newVar();
                    keyVarList.add(keyVar);
                    AbstractFunctionCallExpression createMBR =
                            new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.CREATE_MBR));
                    createMBR.setSourceLocation(sourceLoc);
                    VariableReferenceExpression secondaryKeyVarRef =
                            new VariableReferenceExpression(secondaryKeyVars.get(0));
                    secondaryKeyVarRef.setSourceLocation(sourceLoc);
                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                            new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
                    createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                            new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
                    keyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
                }
                secondaryExpressions.clear();
                for (LogicalVariable secondaryKeyVar : keyVarList) {
                    VariableReferenceExpression secondaryKeyVarRef = new VariableReferenceExpression(secondaryKeyVar);
                    secondaryKeyVarRef.setSourceLocation(sourceLoc);
                    secondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
                }
                if (isPointMBR && isBulkload) {
                    //for PointMBR optimization: see SecondaryRTreeOperationsHelper.buildLoadingJobSpec() and
                    //createFieldPermutationForBulkLoadOp(int) for more details.
                    for (LogicalVariable secondaryKeyVar : keyVarList) {
                        VariableReferenceExpression secondaryKeyVarRef =
                                new VariableReferenceExpression(secondaryKeyVar);
                        secondaryKeyVarRef.setSourceLocation(sourceLoc);
                        secondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
                    }
                }
                AssignOperator assignCoordinates = new AssignOperator(keyVarList, keyExprList);
                assignCoordinates.setSourceLocation(sourceLoc);
                assignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
                context.computeAndSetTypeEnvironmentForOperator(assignCoordinates);
                replicateOutput = assignCoordinates;
                boolean forceFilter = keyPairType.second;
                Mutable<ILogicalExpression> filterExpression = createAnyUnknownFilterExpression(keyVarList,
                        context.getOutputTypeEnvironment(assignCoordinates), forceFilter);
                AssignOperator originalAssignCoordinates = null;
                Mutable<ILogicalExpression> beforeOpFilterExpression = null;
                // We do something similar for beforeOp key if the operation is an upsert
                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                    List<LogicalVariable> originalKeyVarList = new ArrayList<>();
                    List<Mutable<ILogicalExpression>> originalKeyExprList = new ArrayList<>();
                    // we don't do any filtering since nulls are expected here and there
                    for (int i = 0; i < numKeys; i++) {
                        LogicalVariable keyVar = context.newVar();
                        originalKeyVarList.add(keyVar);
                        AbstractFunctionCallExpression createMBR = new ScalarFunctionCallExpression(
                                FunctionUtil.getFunctionInfo(BuiltinFunctions.CREATE_MBR));
                        createMBR.setSourceLocation(sourceLoc);
                        createMBR.getArguments().add(
                                new MutableObject<>(beforeOpSecondaryExpressions.get(0).getValue().cloneExpression()));
                        createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                                new ConstantExpression(new AsterixConstantValue(new AInt32(dimension)))));
                        createMBR.getArguments().add(new MutableObject<ILogicalExpression>(
                                new ConstantExpression(new AsterixConstantValue(new AInt32(i)))));
                        originalKeyExprList.add(new MutableObject<ILogicalExpression>(createMBR));
                    }
                    beforeOpSecondaryExpressions.clear();
                    for (LogicalVariable secondaryKeyVar : originalKeyVarList) {
                        VariableReferenceExpression secondaryKeyVarRef =
                                new VariableReferenceExpression(secondaryKeyVar);
                        secondaryKeyVarRef.setSourceLocation(sourceLoc);
                        beforeOpSecondaryExpressions.add(new MutableObject<ILogicalExpression>(secondaryKeyVarRef));
                    }
                    originalAssignCoordinates = new AssignOperator(originalKeyVarList, originalKeyExprList);
                    originalAssignCoordinates.setSourceLocation(sourceLoc);
                    originalAssignCoordinates.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                    context.computeAndSetTypeEnvironmentForOperator(originalAssignCoordinates);
                    beforeOpFilterExpression = createAnyUnknownFilterExpression(originalKeyVarList,
                            context.getOutputTypeEnvironment(originalAssignCoordinates), forceFilter);
                }
                DataSourceIndex dataSourceIndex = new DataSourceIndex(index, database, dataverseName, datasetName, mp);
                indexUpdate = new IndexInsertDeleteUpsertOperator(dataSourceIndex,
                        OperatorManipulationUtil
                                .cloneExpressions(primaryIndexModificationOp.getPrimaryKeyExpressions()),
                        secondaryExpressions, filterExpression, beforeOpFilterExpression,
                        primaryIndexModificationOp.getOperation(), primaryIndexModificationOp.isBulkload(),
                        primaryIndexModificationOp.getAdditionalNonFilteringExpressions() == null ? 0
                                : primaryIndexModificationOp.getAdditionalNonFilteringExpressions().size());
                indexUpdate.setSourceLocation(sourceLoc);
                indexUpdate.setAdditionalFilteringExpressions(
                        OperatorManipulationUtil.cloneExpressions(filteringExpressions));
                if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                    // set before op secondary key expressions
                    if (filteringFields != null) {
                        VariableReferenceExpression varRef =
                                new VariableReferenceExpression(primaryIndexModificationOp.getBeforeOpFilterVar());
                        varRef.setSourceLocation(sourceLoc);
                        indexUpdate.setBeforeOpAdditionalFilteringExpression(
                                new MutableObject<ILogicalExpression>(varRef));
                    }
                    // set filtering expressions
                    indexUpdate.setBeforeOpSecondaryKeyExprs(beforeOpSecondaryExpressions);
                    // assign --> assign beforeOp values --> secondary index upsert
                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(originalAssignCoordinates));
                } else {
                    indexUpdate.getInputs().add(new MutableObject<ILogicalOperator>(assignCoordinates));
                }
            }

            if (primaryIndexModificationOp.getOperation() == Kind.UPSERT) {
                indexUpdate.setOperationExpr(new MutableObject<>(
                        new VariableReferenceExpression(primaryIndexModificationOp.getOperationVar())));
            }

            context.computeAndSetTypeEnvironmentForOperator(indexUpdate);
            if (!primaryIndexModificationOp.isBulkload() || secondaryIndexTotalCnt == 1) {
                currentTop = indexUpdate;
            } else {
                replicateOp.getOutputs().add(new MutableObject<>(replicateOutput));

                /* special treatment for bulk load with the existence of secondary primary index.
                 * the branch coming out of the replicate operator and feeding the index will not have the usual
                 * "blocking" sort operator since tuples are already sorted. We mark the materialization flag for that
                 * branch to make it blocking. Without "blocking", the activity cluster graph would be messed up
                 */
                if (index.isPrimaryKeyIndex()) {
                    int positionOfSecondaryPrimaryIndex = replicateOp.getOutputs().size() - 1;
                    replicateOp.getOutputMaterializationFlags()[positionOfSecondaryPrimaryIndex] = true;
                }
            }
            if (primaryIndexModificationOp.isBulkload()) {
                // For bulk load, we connect all fanned out insert operator to a single SINK operator
                op0.getInputs().add(new MutableObject<ILogicalOperator>(indexUpdate));
            }
        }

        if (!primaryIndexModificationOp.isBulkload()) {
            // If this is an upsert, we need to
            // Remove the current input to the SINK operator (It is actually already removed above)
            op0.getInputs().clear();
            // Connect the last index update to the SINK
            op0.getInputs().add(new MutableObject<ILogicalOperator>(currentTop));
        }
        return true;
    }