in hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java [367:859]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator probComp = tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final ITuplePairComparator buildComp = tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
final IMissingWriter[] nonMatchWriter =
isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
final ArrayTupleBuilder nullTupleBuild =
isLeftOuter ? new ArrayTupleBuilder(buildRd.getFieldCount()) : null;
if (isLeftOuter) {
DataOutput out = nullTupleBuild.getDataOutput();
for (int i = 0; i < nonMatchWriterFactories.length; i++) {
nonMatchWriter[i] = nonMatchWriterFactories[i].createMissingWriter();
nonMatchWriter[i].writeMissing(out);
nullTupleBuild.addFieldEndOffset();
}
}
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
private BuildAndPartitionTaskState state;
private IFrame rPartbuff = new VSizeFrame(jobletCtx);
private FrameTupleAppender nullResultAppender = null;
private FrameTupleAccessor probeTupleAccessor;
private boolean failed = false;
IOperatorStats stats = null;
@Override
public void open() throws HyracksDataException {
state = (BuildAndPartitionTaskState) ctx.getStateObject(
new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
state.hybridHJ.initProbe(probComp);
state.hybridHJ.setOperatorStats(stats);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
state.hybridHJ.probe(buffer, writer);
}
@Override
public void fail() throws HyracksDataException {
failed = true;
if (state != null && state.hybridHJ != null) {
state.hybridHJ.fail();
}
writer.fail();
}
@Override
public void close() throws HyracksDataException {
if (failed) {
try {
// Clear temp files if fail() was called.
if (state != null && state.hybridHJ != null) {
state.hybridHJ.clearBuildTempFiles();
state.hybridHJ.clearProbeTempFiles();
}
} finally {
writer.close(); // writer should always be closed.
}
return;
}
Throwable ex = null;
try {
try {
state.hybridHJ.completeProbe(writer);
} finally {
state.hybridHJ.releaseResource();
}
BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
rPartbuff.reset();
for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid =
partitionStatus.nextSetBit(pid + 1)) {
RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
if (bReader == null || pReader == null) {
if (isLeftOuter && pReader != null) {
appendNullToProbeTuples(pReader);
}
if (bReader != null) {
bReader.close();
}
if (pReader != null) {
pReader.close();
}
continue;
}
int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
joinPartitionPair(bReader, pReader, bSize, pSize, 1);
}
} catch (Exception e) {
ex = e;
if (state.hybridHJ != null) {
state.hybridHJ.fail();
}
// Since writer.nextFrame() is called in the above "try" body, we have to call writer.fail()
// to send the failure signal to the downstream, when there is a throwable thrown.
CleanupUtils.fail(writer, ex);
// Clear temp files as this.fail() nor this.close() will no longer be called after close().
state.hybridHJ.clearBuildTempFiles();
state.hybridHJ.clearProbeTempFiles();
} finally {
ex = CleanupUtils.close(writer, ex);
}
if (ex != null) {
throw HyracksDataException.create(ex);
}
}
@Override
public void setOperatorStats(IOperatorStats stats) {
this.stats = stats;
}
//The buildSideReader should be always the original buildSideReader, so should the probeSideReader
private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader,
int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException {
ITuplePartitionComputer probeHpc =
new FieldHashPartitionComputerFamily(probeKeys, propHashFunctionFactories)
.createPartitioner(level);
ITuplePartitionComputer buildHpc =
new FieldHashPartitionComputerFamily(buildKeys, buildHashFunctionFactories)
.createPartitioner(level);
int frameSize = jobletCtx.getInitialFrameSize();
long buildPartSize = (long) Math.ceil((double) buildSideReader.getFileSize() / (double) frameSize);
long probePartSize = (long) Math.ceil((double) probeSideReader.getFileSize() / (double) frameSize);
int beforeMax = Math.max(buildSizeInTuple, probeSizeInTuple);
if (stats.getLevel().get() < level) {
stats.getLevel().set(level);
}
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\n>>>Joining Partition Pairs (thread_id {}) (pid ) - (level {}) - BuildSize:\t{}\tProbeSize:\t{} - MemForJoin {} - LeftOuter is {}",
Thread.currentThread().getId(), level, buildPartSize, probePartSize, state.memForJoin,
isLeftOuter);
}
// Calculate the expected hash table size for the both side.
long expectedHashTableSizeForBuildInFrame =
SerializableHashTable.getExpectedTableFrameCount(buildSizeInTuple, frameSize);
long expectedHashTableSizeForProbeInFrame =
SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);
//Apply in-Mem HJ if possible
if (((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin)
|| (probePartSize + expectedHashTableSizeForProbeInFrame < state.memForJoin
&& !isLeftOuter))) {
int tabSize = -1;
if ((isLeftOuter || (buildPartSize < probePartSize))) {
//Case 1.1 - InMemHJ (without Role-Reversal)
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level {}]",
level);
}
tabSize = buildSizeInTuple;
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Build Side is smaller
applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc,
buildSideReader, probeSideReader, probComp); // checked-confirmed
} else { //Case 1.2 - InMemHJ with Role Reversal
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJWITH RoleReversal - [Level {}]",
level);
}
tabSize = probeSizeInTuple;
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
}
//Probe Side is smaller
applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
probeSideReader, buildSideReader, buildComp); // checked-confirmed
}
}
//Apply (Recursive) HHJ
else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("\t>>>Case 2. ApplyRecursiveHHJ - [Level {}]", level);
}
if ((isLeftOuter || buildPartSize < probePartSize)) {
//Case 2.1 - Recursive HHJ (without Role-Reversal)
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
level);
}
applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
beforeMax, probComp);
} else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level {}]", level);
}
applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
beforeMax, buildComp);
}
}
}
private void applyHybridHashJoin(int tableSize, final String PROBE_REL, final String BUILD_REL,
final int[] probeKeys, final int[] buildKeys, final RecordDescriptor probeRd,
final RecordDescriptor buildRd, final ITuplePartitionComputer probeHpc,
final ITuplePartitionComputer buildHpc, RunFileReader probeSideReader,
RunFileReader buildSideReader, final int level, final long beforeMax, ITuplePairComparator comp)
throws HyracksDataException {
boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& buildKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(jobletCtx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
buildRd, probeHpc, buildHpc, null, null, isLeftOuter, nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
try {
buildSideReader.open();
try {
rHHj.initBuild();
rPartbuff.reset();
while (buildSideReader.nextFrame(rPartbuff)) {
rHHj.build(rPartbuff.getBuffer());
}
} finally {
// Makes sure that files are always properly closed.
rHHj.closeBuild();
}
} finally {
buildSideReader.close();
}
try {
probeSideReader.open();
rPartbuff.reset();
try {
rHHj.initProbe(comp);
while (probeSideReader.nextFrame(rPartbuff)) {
rHHj.probe(rPartbuff.getBuffer(), writer);
}
rHHj.completeProbe(writer);
} finally {
rHHj.releaseResource();
}
} finally {
// Makes sure that files are always properly closed.
probeSideReader.close();
}
try {
int maxAfterBuildSize = rHHj.getMaxBuildPartitionSize();
int maxAfterProbeSize = rHHj.getMaxProbePartitionSize();
int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize);
BitSet rPStatus = rHHj.getPartitionStatus();
if ((afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
//Case 2.1.1 - Keep applying HHJ
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
level);
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
int rbSizeInTuple = rHHj.getBuildPartitionSizeInTup(rPid);
int rpSizeInTuple = rHHj.getProbePartitionSizeInTup(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
// For the outer join, we don't reverse the role.
appendNullToProbeTuples(rprfw);
}
if (rbrfw != null) {
rbrfw.close();
}
if (rprfw != null) {
rprfw.close();
}
continue;
}
if (isReversed) {
joinPartitionPair(rprfw, rbrfw, rpSizeInTuple, rbSizeInTuple, level + 1);
} else {
joinPartitionPair(rbrfw, rprfw, rbSizeInTuple, rpSizeInTuple, level + 1);
}
}
} else { //Case 2.1.2 - Switch to NLJ
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(
"\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
level);
}
for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
if (rbrfw == null || rprfw == null) {
if (isLeftOuter && rprfw != null) {
// For the outer join, we don't reverse the role.
appendNullToProbeTuples(rprfw);
}
if (rbrfw != null) {
rbrfw.close();
}
if (rprfw != null) {
rprfw.close();
}
continue;
}
int buildSideInTups = rHHj.getBuildPartitionSizeInTup(rPid);
int probeSideInTups = rHHj.getProbePartitionSizeInTup(rPid);
// NLJ order is outer + inner, the order is reversed from the other joins
if (isLeftOuter || probeSideInTups < buildSideInTups) {
//checked-modified
applyNestedLoopJoin(probeRd, buildRd, memSizeInFrames, rprfw, rbrfw);
} else {
//checked-modified
applyNestedLoopJoin(buildRd, probeRd, memSizeInFrames, rbrfw, rprfw);
}
}
}
} catch (Exception e) {
// Make sure that temporary run files generated in recursive hybrid hash joins
// are closed and deleted.
rHHj.clearBuildTempFiles();
rHHj.clearProbeTempFiles();
throw e;
}
}
private void appendNullToProbeTuples(RunFileReader probReader) throws HyracksDataException {
if (nullResultAppender == null) {
nullResultAppender = new FrameTupleAppender(new VSizeFrame(jobletCtx));
}
if (probeTupleAccessor == null) {
probeTupleAccessor = new FrameTupleAccessor(probeRd);
}
try {
probReader.open();
while (probReader.nextFrame(rPartbuff)) {
probeTupleAccessor.reset(rPartbuff.getBuffer());
for (int tid = 0; tid < probeTupleAccessor.getTupleCount(); tid++) {
FrameUtils.appendConcatToWriter(writer, nullResultAppender, probeTupleAccessor, tid,
nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0,
nullTupleBuild.getSize());
}
}
nullResultAppender.write(writer, true);
} finally {
probReader.close();
}
}
private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader,
ITuplePairComparator comp) throws HyracksDataException {
boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
&& bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
IDeallocatableFramePool framePool =
new DeallocatableFramePool(jobletCtx, state.memForJoin * jobletCtx.getInitialFrameSize());
ISimpleFrameBufferManager bufferManager = new FramePoolBackedFrameBufferManager(framePool);
ISerializableTable table = new SerializableHashTable(tabSize, jobletCtx, bufferManager);
InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx, new FrameTupleAccessor(probeRDesc),
hpcRepProbe, new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild, isLeftOuter,
nonMatchWriter, table, isReversed, bufferManager);
joiner.setComparator(comp);
try {
bReader.open();
rPartbuff.reset();
while (bReader.nextFrame(rPartbuff)) {
stats.getBytesRead().update(rPartbuff.getBuffer().limit());
// We need to allocate a copyBuffer, because this buffer gets added to the buffers list
// in the InMemoryHashJoin.
ByteBuffer copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
// If a frame cannot be allocated, there may be a chance if we can compact the table,
// one or more frame may be reclaimed.
if (copyBuffer == null) {
if (joiner.compactHashTable() > 0) {
copyBuffer = bufferManager.acquireFrame(rPartbuff.getFrameSize());
}
if (copyBuffer == null) {
// Still no frame is allocated? At this point, we have no way to get a frame.
throw new HyracksDataException(
"Can't allocate one more frame. Assign more memory to InMemoryHashJoin.");
}
}
FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
joiner.build(copyBuffer);
rPartbuff.reset();
}
} finally {
bReader.close();
}
try {
//probe
pReader.open();
rPartbuff.reset();
try {
while (pReader.nextFrame(rPartbuff)) {
if (stats != null) {
stats.getBytesRead().update(rPartbuff.getBuffer().limit());
}
joiner.join(rPartbuff.getBuffer(), writer);
rPartbuff.reset();
}
joiner.completeJoin(writer);
} finally {
joiner.releaseMemory();
}
} finally {
try {
pReader.close();
} finally {
joiner.closeTable();
}
}
}
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader) throws HyracksDataException {
// The nested loop join result is outer + inner. All the other operator is probe + build.
// Hence the reverse relation is different.
boolean isReversed = outerRd == buildRd && innerRd == probeRd;
ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp : probComp;
NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
new FrameTupleAccessor(innerRd), memorySize, isLeftOuter, nonMatchWriter, isReversed);
nlj.setComparator(nljComptorOuterInner);
IFrame cacheBuff = new VSizeFrame(jobletCtx);
try {
innerReader.open();
while (innerReader.nextFrame(cacheBuff)) {
nlj.cache(cacheBuff.getBuffer());
cacheBuff.reset();
}
} finally {
try {
nlj.closeCache();
} finally {
innerReader.close();
}
}
try {
IFrame joinBuff = new VSizeFrame(jobletCtx);
outerReader.open();
try {
while (outerReader.nextFrame(joinBuff)) {
nlj.join(joinBuff.getBuffer(), writer);
joinBuff.reset();
}
nlj.completeJoin(writer);
} finally {
nlj.releaseMemory();
}
} finally {
outerReader.close();
}
}
@Override
public String getDisplayName() {
return "Hybrid Hash Join: Probe & Join";
}
};
return op;
}