private static void computeDefaultPhysicalOp()

in algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java [122:384]


    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
            IOptimizationContext context) throws AlgebricksException {
        PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
        if (op.getPhysicalOperator() == null) {
            switch (op.getOperatorTag()) {
                case AGGREGATE: {
                    op.setPhysicalOperator(new AggregatePOperator());
                    break;
                }
                case ASSIGN: {
                    op.setPhysicalOperator(new AssignPOperator());
                    break;
                }
                case DISTINCT: {
                    DistinctOperator distinct = (DistinctOperator) op;
                    distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
                    break;
                }
                case EMPTYTUPLESOURCE: {
                    op.setPhysicalOperator(new EmptyTupleSourcePOperator());
                    break;
                }
                case EXCHANGE: {
                    if (op.getPhysicalOperator() == null) {
                        throw new AlgebricksException("Implementation for EXCHANGE operator was not set.");
                    }
                    // implem. choice for exchange should be set by a parent op.
                    break;
                }
                case GROUP: {
                    GroupByOperator gby = (GroupByOperator) op;

                    if (gby.getNestedPlans().size() == 1) {
                        ILogicalPlan p0 = gby.getNestedPlans().get(0);
                        if (p0.getRoots().size() == 1) {
                            if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
                                    || (gby.getAnnotations()
                                            .get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
                                if (!topLevelOp) {
                                    throw new NotImplementedException(
                                            "External hash group-by for nested grouping is not implemented.");
                                }

                                boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
                                if (hasIntermediateAgg) {
                                    ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
                                            gby.getGroupByList(),
                                            physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
                                            physicalOptimizationConfig.getExternalGroupByTableSize(),
                                            (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
                                                    * physicalOptimizationConfig.getFrameSize());
                                    op.setPhysicalOperator(externalGby);
                                    break;
                                }
                            }
                        }
                    }

                    List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
                    List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
                    for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
                        ILogicalExpression expr = p.second.getValue();
                        if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
                            VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
                            columnList.add(varRef.getVariableReference());
                        }
                    }
                    if (topLevelOp) {
                        op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
                    } else {
                        op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
                    }
                    break;
                }
                case INNERJOIN: {
                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
                    break;
                }
                case LEFTOUTERJOIN: {
                    JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
                    break;
                }
                case LIMIT: {
                    op.setPhysicalOperator(new StreamLimitPOperator());
                    break;
                }
                case NESTEDTUPLESOURCE: {
                    op.setPhysicalOperator(new NestedTupleSourcePOperator());
                    break;
                }
                case ORDER: {
                    OrderOperator oo = (OrderOperator) op;
                    for (Pair<IOrder, Mutable<ILogicalExpression>> p : oo.getOrderExpressions()) {
                        ILogicalExpression e = p.second.getValue();
                        if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
                            throw new AlgebricksException("Order expression " + e + " has not been normalized.");
                        }
                    }
                    if (topLevelOp) {
                        op.setPhysicalOperator(new StableSortPOperator(
                                physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
                    } else {
                        op.setPhysicalOperator(new InMemoryStableSortPOperator());
                    }
                    break;
                }
                case PROJECT: {
                    op.setPhysicalOperator(new StreamProjectPOperator());
                    break;
                }
                case RUNNINGAGGREGATE: {
                    op.setPhysicalOperator(new RunningAggregatePOperator());
                    break;
                }
                case REPLICATE: {
                    op.setPhysicalOperator(new ReplicatePOperator());
                    break;
                }
                case SCRIPT: {
                    op.setPhysicalOperator(new StringStreamingScriptPOperator());
                    break;
                }
                case SELECT: {
                    op.setPhysicalOperator(new StreamSelectPOperator());
                    break;
                }
                case SUBPLAN: {
                    op.setPhysicalOperator(new SubplanPOperator());
                    break;
                }
                case UNIONALL: {
                    op.setPhysicalOperator(new UnionAllPOperator());
                    break;
                }
                case INTERSECT: {
                    op.setPhysicalOperator(new IntersectPOperator());
                    break;
                }
                case UNNEST: {
                    op.setPhysicalOperator(new UnnestPOperator());
                    break;
                }
                case DATASOURCESCAN: {
                    DataSourceScanOperator scan = (DataSourceScanOperator) op;
                    IDataSource dataSource = scan.getDataSource();
                    DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
                    IMetadataProvider mp = context.getMetadataProvider();
                    if (mp.scannerOperatorIsLeaf(dataSource)) {
                        dss.disableJobGenBelowMe();
                    }
                    op.setPhysicalOperator(dss);
                    break;
                }
                case WRITE: {
                    op.setPhysicalOperator(new SinkWritePOperator());
                    break;
                }
                case DISTRIBUTE_RESULT: {
                    op.setPhysicalOperator(new DistributeResultPOperator());
                    break;
                }
                case WRITE_RESULT: {
                    WriteResultOperator opLoad = (WriteResultOperator) op;
                    LogicalVariable payload;
                    List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
                    List<LogicalVariable> additionalFilteringKeys = null;
                    payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
                    if (opLoad.getAdditionalFilteringExpressions() != null) {
                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
                        getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                    }
                    op.setPhysicalOperator(
                            new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
                    break;
                }
                case INSERT_DELETE_UPSERT: {
                    // Primary index
                    InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
                    LogicalVariable payload;
                    List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
                    List<LogicalVariable> additionalFilteringKeys = null;
                    List<LogicalVariable> additionalNonFilterVariables = null;
                    if (opLoad.getAdditionalNonFilteringExpressions() != null) {
                        additionalNonFilterVariables = new ArrayList<LogicalVariable>();
                        getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
                    }
                    payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
                    if (opLoad.getAdditionalFilteringExpressions() != null) {
                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
                        getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                    }
                    if (opLoad.isBulkload()) {
                        op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys,
                                additionalNonFilterVariables, opLoad.getDataSource()));
                    } else {
                        op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
                                opLoad.getDataSource(), opLoad.getOperation(), additionalNonFilterVariables));
                    }
                    break;
                }
                case INDEX_INSERT_DELETE_UPSERT: {
                    // Secondary index
                    IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
                    List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
                    List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                    List<LogicalVariable> additionalFilteringKeys = null;
                    getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
                    getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
                    if (opInsDel.getAdditionalFilteringExpressions() != null) {
                        additionalFilteringKeys = new ArrayList<LogicalVariable>();
                        getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
                    }
                    if (opInsDel.isBulkload()) {
                        op.setPhysicalOperator(
                                new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
                                        opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
                    } else {
                        List<LogicalVariable> prevSecondaryKeys = null;
                        LogicalVariable prevAdditionalFilteringKey = null;
                        if (opInsDel.getOperation() == Kind.UPSERT) {
                            prevSecondaryKeys = new ArrayList<LogicalVariable>();
                            getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
                            if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
                                prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
                                        .getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
                            }
                        }
                        op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
                                additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
                                prevSecondaryKeys, prevAdditionalFilteringKey));
                    }
                    break;

                }
                case TOKENIZE: {
                    TokenizeOperator opTokenize = (TokenizeOperator) op;
                    List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
                    List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
                    getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
                    getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
                    // Tokenize Operator only operates with a bulk load on a data set with an index
                    if (opTokenize.isBulkload()) {
                        op.setPhysicalOperator(
                                new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
                    }
                    break;
                }
                case SINK: {
                    op.setPhysicalOperator(new SinkPOperator());
                    break;
                }
            }
        }
        if (op.hasNestedPlans()) {
            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
            for (ILogicalPlan p : nested.getNestedPlans()) {
                setPhysicalOperators(p, false, context);
            }
        }
        for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
            computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
        }
    }