in hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java [344:716]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparatorProbe2Build = tuplePairComparatorFactoryProbe2Build
.createTuplePairComparator(ctx);
final ITuplePairComparator nljComparatorBuild2Probe = tuplePairComparatorFactoryBuild2Probe
.createTuplePairComparator(ctx);
final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
.createPredicateEvaluator());
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
final ArrayTupleBuilder nullTupleBuild = isLeftOuter ?
new ArrayTupleBuilder(buildRd.getFieldCount()) :
null;
if (isLeftOuter) {
DataOutput out = nullTupleBuild.getDataOutput();
for (int i = 0; i < nullWriterFactories1.length; i++) {
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
nullWriters1[i].writeNull(out);
nullTupleBuild.addFieldEndOffset();
}
}
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
private IFrame rPartbuff = new VSizeFrame(ctx);
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
@Override
public void open() throws HyracksDataException {
state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
state.hybridHJ.initProbe();
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
state.hybridHJ.probe(buffer, writer);
}
@Override
public void fail() throws HyracksDataException {
writer.fail();
}
@Override
public void close() throws HyracksDataException {
try {
state.hybridHJ.closeProbe(writer);
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
if (bReader == null || pReader == null) {
if (isLeftOuter && pReader != null) {
appendNullToProbeTuples(pReader);
}
continue;
}
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
joinPartitionPair(bReader, pReader, bSize, pSize, 1);
}
} finally {
writer.close();
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
}
}
//The buildSideReader should be always the original buildSideReader, so should the probeSideReader
private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
int buildSizeInTuple, int probeSizeInTuple, int level)
throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
long buildPartSize = buildSideReader.getFileSize() / ctx.getInitialFrameSize();
long probePartSize = probeSideReader.getFileSize() / ctx.getInitialFrameSize();
int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+ ") - (level " + level + ")"
+ " - BuildSize:\t" + buildPartSize + "\tProbeSize:\t" + probePartSize
+ " - MemForJoin "
+ (state.memForJoin)
+ " - LeftOuter is " + isLeftOuter);
}
//Apply in-Mem HJ if possible
if (!skipInMemoryHJ && ((buildPartSize < state.memForJoin)
|| (probePartSize < state.memForJoin && !isLeftOuter))) {
int tabSize = -1;
if (!forceRR && (isLeftOuter || (buildPartSize
< probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
+ level + "]");
}
tabSize = buildSizeInTuple;
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Build Side is smaller
applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc,
probeHpc, buildSideReader, probeSideReader); // checked-confirmed
} else { //Case 1.2 - InMemHJ with Role Reversal
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
+ level + "]");
}
tabSize = probeSizeInTuple;
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Probe Side is smaller
applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc,
buildHpc, probeSideReader, buildSideReader); // checked-confirmed
}
}
//Apply (Recursive) HHJ
else {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
}
if (!forceRR && (isLeftOuter
|| buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
}
applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
beforeMax);
} else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]");
}
applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
beforeMax);
}
}
}
private void applyHybridHashJoin(int tableSize,
final String PROBE_REL, final String BUILD_REL,
final int[] probeKeys, final int[] buildKeys,
final RecordDescriptor probeRd, final RecordDescriptor buildRd,
final ITuplePartitionComputer probeHpc, final ITuplePartitionComputer buildHpc,
RunFileReader probeSideReader, RunFileReader buildSideReader,
final int level, final long beforeMax)
throws HyracksDataException {
boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor,
nPartitions);
rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL,
probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc,
predEvaluator, isLeftOuter, nullWriterFactories1); //checked-confirmed
rHHj.setIsReversed(isReversed);
buildSideReader.open();
rHHj.initBuild();
rPartbuff.reset();
while (buildSideReader.nextFrame(rPartbuff)) {
rHHj.build(rPartbuff.getBuffer());
}
rHHj.closeBuild();
buildSideReader.close();
probeSideReader.open();
rHHj.initProbe();
rPartbuff.reset();
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
rHHj.closeProbe(writer);
probeSideReader.close();
int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
BitSet rPStatus = rHHj.getPartitionStatus();
if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
* beforeMax))) { //Case 2.1.1 - Keep applying HHJ
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
appendNullToProbeTuples(rprfw);
}
continue;
}
if (isReversed) {
joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
} else {
joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
}
}
} else { //Case 2.1.2 - Switch to NLJ
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(
"\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) { // if outer join, we don't reverse
appendNullToProbeTuples(rprfw);
}
continue;
}
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
// NLJ order is outer + inner, the order is reversed from the other joins
if (isLeftOuter || probeSideInTups < buildSideInTups) {
applyNestedLoopJoin(probeRd, buildRd, frameLimit, rprfw, rbrfw); //checked-modified
} else {
applyNestedLoopJoin(buildRd, probeRd, frameLimit, rbrfw, rprfw); //checked-modified
}
}
}
}
private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
if (nullResultAppender == null) {
nullResultAppender = new FrameTupleAppender(new VSizeFrame(ctx));
}
if (probeTupleAccessor == null) {
probeTupleAccessor = new FrameTupleAccessor(probeRd);
}
probReader.open();
while (probReader.nextFrame(rPartbuff)) {
probeTupleAccessor.reset(rPartbuff.getBuffer());
for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
nullTupleBuild.getSize());
}
}
probReader.close();
nullResultAppender.write(writer, true);
}
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader)
throws HyracksDataException {
boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
ISerializableTable table = new SerializableHashTable(tabSize, ctx);
InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tabSize, new FrameTupleAccessor(probeRDesc),
hpcRepProbe, new FrameTupleAccessor(buildRDesc), hpcRepBuild,
new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nullWriters1, table,
predEvaluator, isReversed);
bReader.open();
rPartbuff.reset();
while (bReader.nextFrame(rPartbuff)) {
//We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize());
FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
joiner.build(copyBuffer);
rPartbuff.reset();
}
bReader.close();
rPartbuff.reset();
//probe
pReader.open();
while (pReader.nextFrame(rPartbuff)) {
joiner.join(rPartbuff.getBuffer(), writer);
rPartbuff.reset();
}
pReader.close();
joiner.closeJoin(writer);
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader)
throws HyracksDataException {
// The nested loop join result is outer + inner. All the other operator is probe + build. Hence the reverse relation is different
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
ITuplePairComparator nljComptorOuterInner = isReversed ?
nljComparatorBuild2Probe :
nljComparatorProbe2Build;
NestedLoopJoin nlj = new NestedLoopJoin(ctx,
new FrameTupleAccessor(outerRd),
new FrameTupleAccessor(innerRd), nljComptorOuterInner, memorySize,
predEvaluator, isLeftOuter, nullWriters1);
nlj.setIsReversed(isReversed);
IFrame cacheBuff = new VSizeFrame(ctx);
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
nlj.cache(cacheBuff.getBuffer());
cacheBuff.reset();
}
nlj.closeCache();
IFrame joinBuff = new VSizeFrame(ctx);
outerReader.open();
while (outerReader.nextFrame(joinBuff)) {
nlj.join(joinBuff.getBuffer(), writer);
joinBuff.reset();
}
nlj.closeJoin(writer);
outerReader.close();
innerReader.close();
}
};
return op;
}