in algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java [122:384]
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
IOptimizationContext context) throws AlgebricksException {
PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
if (op.getPhysicalOperator() == null) {
switch (op.getOperatorTag()) {
case AGGREGATE: {
op.setPhysicalOperator(new AggregatePOperator());
break;
}
case ASSIGN: {
op.setPhysicalOperator(new AssignPOperator());
break;
}
case DISTINCT: {
DistinctOperator distinct = (DistinctOperator) op;
distinct.setPhysicalOperator(new PreSortedDistinctByPOperator(distinct.getDistinctByVarList()));
break;
}
case EMPTYTUPLESOURCE: {
op.setPhysicalOperator(new EmptyTupleSourcePOperator());
break;
}
case EXCHANGE: {
if (op.getPhysicalOperator() == null) {
throw new AlgebricksException("Implementation for EXCHANGE operator was not set.");
}
// implem. choice for exchange should be set by a parent op.
break;
}
case GROUP: {
GroupByOperator gby = (GroupByOperator) op;
if (gby.getNestedPlans().size() == 1) {
ILogicalPlan p0 = gby.getNestedPlans().get(0);
if (p0.getRoots().size() == 1) {
if ((gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE)
|| (gby.getAnnotations()
.get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE)) {
if (!topLevelOp) {
throw new NotImplementedException(
"External hash group-by for nested grouping is not implemented.");
}
boolean hasIntermediateAgg = generateMergeAggregationExpressions(gby, context);
if (hasIntermediateAgg) {
ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator(
gby.getGroupByList(),
physicalOptimizationConfig.getMaxFramesExternalGroupBy(),
physicalOptimizationConfig.getExternalGroupByTableSize(),
(long) physicalOptimizationConfig.getMaxFramesExternalGroupBy()
* physicalOptimizationConfig.getFrameSize());
op.setPhysicalOperator(externalGby);
break;
}
}
}
}
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gbyList = gby.getGroupByList();
List<LogicalVariable> columnList = new ArrayList<LogicalVariable>(gbyList.size());
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gbyList) {
ILogicalExpression expr = p.second.getValue();
if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
columnList.add(varRef.getVariableReference());
}
}
if (topLevelOp) {
op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList));
} else {
op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList));
}
break;
}
case INNERJOIN: {
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
break;
}
case LEFTOUTERJOIN: {
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
break;
}
case LIMIT: {
op.setPhysicalOperator(new StreamLimitPOperator());
break;
}
case NESTEDTUPLESOURCE: {
op.setPhysicalOperator(new NestedTupleSourcePOperator());
break;
}
case ORDER: {
OrderOperator oo = (OrderOperator) op;
for (Pair<IOrder, Mutable<ILogicalExpression>> p : oo.getOrderExpressions()) {
ILogicalExpression e = p.second.getValue();
if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
throw new AlgebricksException("Order expression " + e + " has not been normalized.");
}
}
if (topLevelOp) {
op.setPhysicalOperator(new StableSortPOperator(
physicalOptimizationConfig.getMaxFramesExternalSort(), oo.getTopK()));
} else {
op.setPhysicalOperator(new InMemoryStableSortPOperator());
}
break;
}
case PROJECT: {
op.setPhysicalOperator(new StreamProjectPOperator());
break;
}
case RUNNINGAGGREGATE: {
op.setPhysicalOperator(new RunningAggregatePOperator());
break;
}
case REPLICATE: {
op.setPhysicalOperator(new ReplicatePOperator());
break;
}
case SCRIPT: {
op.setPhysicalOperator(new StringStreamingScriptPOperator());
break;
}
case SELECT: {
op.setPhysicalOperator(new StreamSelectPOperator());
break;
}
case SUBPLAN: {
op.setPhysicalOperator(new SubplanPOperator());
break;
}
case UNIONALL: {
op.setPhysicalOperator(new UnionAllPOperator());
break;
}
case INTERSECT: {
op.setPhysicalOperator(new IntersectPOperator());
break;
}
case UNNEST: {
op.setPhysicalOperator(new UnnestPOperator());
break;
}
case DATASOURCESCAN: {
DataSourceScanOperator scan = (DataSourceScanOperator) op;
IDataSource dataSource = scan.getDataSource();
DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource);
IMetadataProvider mp = context.getMetadataProvider();
if (mp.scannerOperatorIsLeaf(dataSource)) {
dss.disableJobGenBelowMe();
}
op.setPhysicalOperator(dss);
break;
}
case WRITE: {
op.setPhysicalOperator(new SinkWritePOperator());
break;
}
case DISTRIBUTE_RESULT: {
op.setPhysicalOperator(new DistributeResultPOperator());
break;
}
case WRITE_RESULT: {
WriteResultOperator opLoad = (WriteResultOperator) op;
LogicalVariable payload;
List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getKeyExpressions(), keys);
if (opLoad.getAdditionalFilteringExpressions() != null) {
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
op.setPhysicalOperator(
new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
break;
}
case INSERT_DELETE_UPSERT: {
// Primary index
InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
LogicalVariable payload;
List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
List<LogicalVariable> additionalNonFilterVariables = null;
if (opLoad.getAdditionalNonFilteringExpressions() != null) {
additionalNonFilterVariables = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalNonFilteringExpressions(), additionalNonFilterVariables);
}
payload = getKeysAndLoad(opLoad.getPayloadExpression(), opLoad.getPrimaryKeyExpressions(), keys);
if (opLoad.getAdditionalFilteringExpressions() != null) {
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opLoad.isBulkload()) {
op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys,
additionalNonFilterVariables, opLoad.getDataSource()));
} else {
op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
opLoad.getDataSource(), opLoad.getOperation(), additionalNonFilterVariables));
}
break;
}
case INDEX_INSERT_DELETE_UPSERT: {
// Secondary index
IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
getKeys(opInsDel.getPrimaryKeyExpressions(), primaryKeys);
getKeys(opInsDel.getSecondaryKeyExpressions(), secondaryKeys);
if (opInsDel.getAdditionalFilteringExpressions() != null) {
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opInsDel.isBulkload()) {
op.setPhysicalOperator(
new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
} else {
List<LogicalVariable> prevSecondaryKeys = null;
LogicalVariable prevAdditionalFilteringKey = null;
if (opInsDel.getOperation() == Kind.UPSERT) {
prevSecondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
.getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
}
}
op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
prevSecondaryKeys, prevAdditionalFilteringKey));
}
break;
}
case TOKENIZE: {
TokenizeOperator opTokenize = (TokenizeOperator) op;
List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
// Tokenize Operator only operates with a bulk load on a data set with an index
if (opTokenize.isBulkload()) {
op.setPhysicalOperator(
new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
}
break;
}
case SINK: {
op.setPhysicalOperator(new SinkPOperator());
break;
}
}
}
if (op.hasNestedPlans()) {
AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
setPhysicalOperators(p, false, context);
}
}
for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
computeDefaultPhysicalOp((AbstractLogicalOperator) opRef.getValue(), topLevelOp, context);
}
}