private boolean rewriteForOneEquivalentClass()

in algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/ExtractCommonOperatorsRule.java [142:261]


    private boolean rewriteForOneEquivalentClass(List<Mutable<ILogicalOperator>> members, IOptimizationContext context)
            throws AlgebricksException {
        List<Mutable<ILogicalOperator>> group = new ArrayList<Mutable<ILogicalOperator>>();
        boolean rewritten = false;
        while (members.size() > 0) {
            group.clear();
            Mutable<ILogicalOperator> candidate = members.remove(members.size() - 1);
            group.add(candidate);
            for (int i = members.size() - 1; i >= 0; i--) {
                Mutable<ILogicalOperator> peer = members.get(i);
                if (IsomorphismUtilities.isOperatorIsomorphic(candidate.getValue(), peer.getValue())) {
                    group.add(peer);
                    members.remove(i);
                }
            }
            boolean[] materializationFlags = computeMaterilizationFlags(group);
            if (group.isEmpty()) {
                continue;
            }
            candidate = group.get(0);
            ReplicateOperator rop = new ReplicateOperator(group.size(), materializationFlags);
            rop.setPhysicalOperator(new ReplicatePOperator());
            Mutable<ILogicalOperator> ropRef = new MutableObject<ILogicalOperator>(rop);
            AbstractLogicalOperator aopCandidate = (AbstractLogicalOperator) candidate.getValue();
            List<Mutable<ILogicalOperator>> originalCandidateParents = childrenToParents.get(candidate);

            rop.setExecutionMode(((AbstractLogicalOperator) candidate.getValue()).getExecutionMode());
            if (aopCandidate.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                rop.getInputs().add(candidate);
            } else {
                AbstractLogicalOperator beforeExchange = new ExchangeOperator();
                beforeExchange.setPhysicalOperator(new OneToOneExchangePOperator());
                beforeExchange.setExecutionMode(rop.getExecutionMode());
                Mutable<ILogicalOperator> beforeExchangeRef = new MutableObject<ILogicalOperator>(beforeExchange);
                beforeExchange.getInputs().add(candidate);
                context.computeAndSetTypeEnvironmentForOperator(beforeExchange);
                rop.getInputs().add(beforeExchangeRef);
            }
            context.computeAndSetTypeEnvironmentForOperator(rop);

            for (Mutable<ILogicalOperator> parentRef : originalCandidateParents) {
                AbstractLogicalOperator parent = (AbstractLogicalOperator) parentRef.getValue();
                int index = parent.getInputs().indexOf(candidate);
                if (parent.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
                    parent.getInputs().set(index, ropRef);
                    rop.getOutputs().add(parentRef);
                } else {
                    AbstractLogicalOperator exchange = new ExchangeOperator();
                    exchange.setPhysicalOperator(new OneToOneExchangePOperator());
                    exchange.setExecutionMode(rop.getExecutionMode());
                    MutableObject<ILogicalOperator> exchangeRef = new MutableObject<ILogicalOperator>(exchange);
                    exchange.getInputs().add(ropRef);
                    rop.getOutputs().add(exchangeRef);
                    context.computeAndSetTypeEnvironmentForOperator(exchange);
                    parent.getInputs().set(index, exchangeRef);
                    context.computeAndSetTypeEnvironmentForOperator(parent);
                }
            }
            List<LogicalVariable> liveVarsNew = new ArrayList<LogicalVariable>();
            VariableUtilities.getLiveVariables(candidate.getValue(), liveVarsNew);
            ArrayList<Mutable<ILogicalExpression>> assignExprs = new ArrayList<Mutable<ILogicalExpression>>();
            for (LogicalVariable liveVar : liveVarsNew) {
                assignExprs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
            }
            for (Mutable<ILogicalOperator> ref : group) {
                if (ref.equals(candidate)) {
                    continue;
                }
                ArrayList<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
                Map<LogicalVariable, LogicalVariable> variableMappingBack = new HashMap<LogicalVariable, LogicalVariable>();
                IsomorphismUtilities.mapVariablesTopDown(ref.getValue(), candidate.getValue(), variableMappingBack);
                for (int i = 0; i < liveVarsNew.size(); i++) {
                    liveVars.add(variableMappingBack.get(liveVarsNew.get(i)));
                }

                AbstractLogicalOperator assignOperator = new AssignOperator(liveVars, assignExprs);
                assignOperator.setExecutionMode(rop.getExecutionMode());
                assignOperator.setPhysicalOperator(new AssignPOperator());
                AbstractLogicalOperator projectOperator = new ProjectOperator(liveVars);
                projectOperator.setPhysicalOperator(new StreamProjectPOperator());
                projectOperator.setExecutionMode(rop.getExecutionMode());
                AbstractLogicalOperator exchOp = new ExchangeOperator();
                exchOp.setPhysicalOperator(new OneToOneExchangePOperator());
                exchOp.setExecutionMode(rop.getExecutionMode());
                exchOp.getInputs().add(ropRef);
                MutableObject<ILogicalOperator> exchOpRef = new MutableObject<ILogicalOperator>(exchOp);
                rop.getOutputs().add(exchOpRef);
                assignOperator.getInputs().add(exchOpRef);
                projectOperator.getInputs().add(new MutableObject<ILogicalOperator>(assignOperator));

                // set the types
                context.computeAndSetTypeEnvironmentForOperator(exchOp);
                context.computeAndSetTypeEnvironmentForOperator(assignOperator);
                context.computeAndSetTypeEnvironmentForOperator(projectOperator);

                List<Mutable<ILogicalOperator>> parentOpList = childrenToParents.get(ref);
                for (Mutable<ILogicalOperator> parentOpRef : parentOpList) {
                    AbstractLogicalOperator parentOp = (AbstractLogicalOperator) parentOpRef.getValue();
                    int index = parentOp.getInputs().indexOf(ref);
                    ILogicalOperator childOp = parentOp.getOperatorTag() == LogicalOperatorTag.PROJECT ? assignOperator
                            : projectOperator;
                    if (!HeuristicOptimizer.isHyracksOp(parentOp.getPhysicalOperator().getOperatorTag())) {
                        parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(childOp));
                    } else {
                        // If the parent operator is a hyracks operator,
                        // an extra one-to-one exchange is needed.
                        AbstractLogicalOperator exchg = new ExchangeOperator();
                        exchg.setPhysicalOperator(new OneToOneExchangePOperator());
                        exchg.setExecutionMode(childOp.getExecutionMode());
                        exchg.getInputs().add(new MutableObject<ILogicalOperator>(childOp));
                        parentOp.getInputs().set(index, new MutableObject<ILogicalOperator>(exchg));
                        context.computeAndSetTypeEnvironmentForOperator(exchg);
                    }
                    context.computeAndSetTypeEnvironmentForOperator(parentOp);
                }
            }
            rewritten = true;
        }
        return rewritten;
    }