in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java [519:711]
public IterOutcome innerNext() {
if (wasKilled) {
// We have received a cancel signal. We need to stop processing.
cleanup();
return IterOutcome.NONE;
}
prefetchFirstBuildBatch();
if (rightUpstream.isError()) {
// A termination condition was reached while prefetching the first build
// side data holding batch.
// We need to terminate.
return rightUpstream;
}
boolean isExistException = false;
try {
/*
* If we are here for the first time, execute the build phase of the hash
* join and setup the run time generated class for the probe side
*/
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
IterOutcome buildExecuteTermination = executeBuildPhase();
if (buildExecuteTermination != null) {
// A termination condition was reached while executing the build
// phase.
// We need to terminate.
return buildExecuteTermination;
}
buildComplete = true;
if (isRowKeyJoin) {
// discard the first left batch which was fetched by buildSchema, and
// get the new one based on rowkey join
leftUpstream = next(left);
}
// Update the hash table related stats for the operator
updateStats();
}
// Try to probe and project, or recursively handle a spilled partition
if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
joinIsLeftOrFull) { // or if this is a left/full outer join
prefetchFirstProbeBatch();
if (leftUpstream.isError()
|| (leftUpstream == NONE && !joinIsRightOrFull)) {
// A termination condition was reached while prefetching the first
// probe side data holding batch.
// We need to terminate.
return leftUpstream;
}
if (!buildSideIsEmpty.booleanValue()
|| !probeSideIsEmpty.booleanValue()) {
// Only allocate outgoing vectors and execute probing logic if there
// is data
if (state == BatchState.FIRST) {
setupProbe();
}
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
probe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
outputRecords = probe.probeAndProject();
container.setValueCount(outputRecords);
batchMemoryManager.updateOutgoingStats(outputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this,
getRecordBatchStatsContext());
/*
* We are here because of one the following 1. Completed processing of
* all the records and we are done 2. We've filled up the outgoing
* batch to the maximum and we need to return upstream Either case
* build the output container's schema and return
*/
if (outputRecords > 0 || state == BatchState.FIRST) {
state = BatchState.NOT_FIRST;
return IterOutcome.OK;
}
}
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
for (HashPartition partn : partitions) {
partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
if (!buildSideIsEmpty.booleanValue()) {
while (!spilledState.isEmpty()) { // "while" is only used for
// skipping; see "continue" below
// Get the next (previously) spilled partition to handle as incoming
SpilledPartition currSp = spilledState
.getNextSpilledPartition();
// If the outer is empty (and it's not a right/full join) - try the
// next spilled partition
if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
continue;
}
// Create a BUILD-side "incoming" out of the inner spill file of
// that partition
buildBatch = new SpilledRecordBatch(currSp.innerSpillFile,
currSp.innerSpilledBatches, context, buildSchema, oContext,
spillSet);
// The above ctor call also got the first batch; need to update the
// outcome
rightUpstream = ((SpilledRecordBatch) buildBatch)
.getInitialOutcome();
if (currSp.outerSpilledBatches > 0) {
// Create a PROBE-side "incoming" out of the outer spill file of
// that partition
probeBatch = new SpilledRecordBatch(currSp.outerSpillFile,
currSp.outerSpilledBatches, context, probeSchema, oContext,
spillSet);
// The above ctor call also got the first batch; need to update
// the outcome
leftUpstream = ((SpilledRecordBatch) probeBatch)
.getInitialOutcome();
} else {
probeBatch = left; // if no outer batch then reuse left - needed
// for updateIncoming()
leftUpstream = IterOutcome.NONE;
probe.changeToFinalProbeState();
}
spilledState.updateCycle(stats, currSp, spilledStateUpdater);
state = BatchState.FIRST; // TODO need to determine if this is still
// necessary since
// prefetchFirstBatchFromBothSides sets
// this
prefetchedBuild.setValue(false);
prefetchedProbe.setValue(false);
return innerNext(); // start processing the next spilled partition
// "recursively"
}
}
} else {
// Our build side is empty, we won't have any matches, clear the probe
// side
killAndDrainLeftUpstream();
}
// No more output records, clean up and return
state = BatchState.DONE;
cleanup();
return IterOutcome.NONE;
} catch (SchemaChangeException e) {
throw UserException.schemaChangeError(e).build(logger);
} catch (OutOfMemoryException oom) {
isExistException = true;
throw UserException.memoryError(oom).build(logger);
} catch (Exception e) {
//Internal catch OutOfMemoryException, resulting in throwing other exceptions or others
isExistException = true;
throw UserException.executionError(e).build(logger);
} finally {
boolean isReleaseBuildBatch = buildBatch != null && buildBatch instanceof SpilledRecordBatch;
boolean isReleaseProbeBatch = probeBatch != null && probeBatch instanceof SpilledRecordBatch;
//release buildBatch spill memory
if (isExistException && isReleaseBuildBatch) {
buildBatch.cancel();
}
//release probeBatch spill memory
if (isExistException && isReleaseProbeBatch) {
probeBatch.cancel();
}
}
}