in algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java [88:200]
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
// In a cost-based optimizer, we would also try to propagate the
// parent's partitioning requirements.
AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
IPartitioningProperty pp1 = null;
IPartitioningProperty pp2 = null;
if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
switch (partitioningType) {
case PAIRWISE: {
pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch),
context.getComputationNodeDomain());
pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch),
context.getComputationNodeDomain());
break;
}
case BROADCAST: {
pp2 = new BroadcastPartitioningProperty(context.getComputationNodeDomain());
break;
}
default: {
throw new IllegalStateException();
}
}
}
pv[0] = new StructuralPropertiesVector(pp1, null);
pv[1] = new StructuralPropertiesVector(pp2, null);
IPartitioningRequirementsCoordinator prc;
switch (kind) {
case INNER: {
prc = IPartitioningRequirementsCoordinator.EQCLASS_PARTITIONING_COORDINATOR;
break;
}
case LEFT_OUTER: {
prc = new IPartitioningRequirementsCoordinator() {
@Override
public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
if (firstDeliveredPartitioning != null && firstDeliveredPartitioning
.getPartitioningType() == requirements.getPartitioningType()) {
switch (requirements.getPartitioningType()) {
case UNORDERED_PARTITIONED: {
UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
Set<LogicalVariable> set1 = upp1.getColumnSet();
UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
? keysRightBranch : keysLeftBranch;
List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
: keysRightBranch;
for (LogicalVariable r : uppreq.getColumnSet()) {
EquivalenceClass ecSnd = eqmap.get(r);
boolean found = false;
int j = 0;
for (LogicalVariable rvar : keysFirst) {
if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) {
found = true;
break;
}
j++;
}
if (!found) {
throw new IllegalStateException("Did not find a variable equivalent to " + r
+ " among " + keysFirst);
}
LogicalVariable v2 = keysSecond.get(j);
EquivalenceClass ecFst = eqmap.get(v2);
for (LogicalVariable vset1 : set1) {
if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) {
covered.add(vset1);
modifuppreq.add(r);
break;
}
}
if (covered.equals(set1)) {
break;
}
}
if (!covered.equals(set1)) {
throw new AlgebricksException("Could not modify " + requirements
+ " to agree with partitioning property " + firstDeliveredPartitioning
+ " delivered by previous input operator.");
}
UnorderedPartitionedProperty upp2 = new UnorderedPartitionedProperty(modifuppreq,
requirements.getNodeDomain());
return new Pair<Boolean, IPartitioningProperty>(false, upp2);
}
case ORDERED_PARTITIONED: {
throw new NotImplementedException();
}
}
}
return new Pair<Boolean, IPartitioningProperty>(true, requirements);
}
};
break;
}
default: {
throw new IllegalStateException();
}
}
return new PhysicalRequirements(pv, prc);
}