in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java [981:1189]
public IterOutcome executeBuildPhase() throws SchemaChangeException {
if (buildSideIsEmpty.booleanValue()) {
// empty right
return null;
}
if (skipHashTableBuild) { // No hash table needed - then consume all the
// right upstream
killAndDrainRightUpstream();
return null;
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
{
// Initializing build calculator
// Limit scope of these variables to this block
int maxBatchSize = spilledState.isFirstCycle()
? RecordBatch.MAX_BATCH_ROW_COUNT
: RECORDS_PER_BATCH;
boolean doMemoryCalculation = canSpill
&& !probeSideIsEmpty.booleanValue();
HashJoinMemoryCalculator calc = getCalculatorImpl();
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after
// growing hash
// values bug
// fixed
buildBatch, probeBatch, buildJoinColumns,
probeSideIsEmpty.booleanValue(), allocator.getLimit(), numPartitions,
RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
if (spilledState.isFirstCycle() && doMemoryCalculation) {
// Do auto tuning
buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
}
}
if (spilledState.isFirstCycle()) {
// Do initial setup only on the first cycle
delayedSetup();
}
initializeBuild();
initializeRuntimeFilter();
// Make the calculator aware of our partitions
HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(
partitions);
buildCalc.setPartitionStatSet(partitionStatSet);
boolean moreData = true;
while (moreData) {
switch (rightUpstream) {
case NONE:
case NOT_YET:
moreData = false;
continue;
case OK_NEW_SCHEMA:
if (!buildSchema.equals(buildBatch.getSchema())) {
throw SchemaChangeException.schemaChanged(
this.getClass().getSimpleName() + " does not support schema changes in build side.",
buildSchema, buildBatch.getSchema());
}
for (HashPartition partn : partitions) {
partn.updateBatches();
}
// Fall through
case OK:
batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
int currentRecordCount = buildBatch.getRecordCount();
// create runtime filter
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
// create runtime filter and send out async
for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
int fieldId = bloomFilter2buildId.get(bloomFilter);
for (int ind = 0; ind < currentRecordCount; ind++) {
long hashCode = hash64.hash64Code(ind, 0, fieldId);
bloomFilter.insert(hashCode);
}
}
}
// Special treatment (when no spill, and single partition) -- use the
// incoming vectors as they are (no row copy)
if (numPartitions == 1) {
partitions[0].appendBatch(buildBatch);
break;
}
if (!spilledState.isFirstCycle()) {
read_right_HV_vector = (IntVector) buildBatch.getContainer()
.getLast();
}
// For every record in the build batch, hash the key columns and keep
// the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = spilledState.isFirstCycle()
? partitions[0].getBuildHashCode(ind)
: read_right_HV_vector.getAccessor().get(ind); // get the hash
// value from the
// HV column
int currPart = hashCode & spilledState.getPartitionMask();
hashCode >>>= spilledState.getBitsInMask();
// Append the new inner row to the appropriate partition; spill (that
// partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
hashCode, buildCalc);
}
if (read_right_HV_vector != null) {
read_right_HV_vector.clear();
read_right_HV_vector = null;
}
break;
default:
throw new IllegalStateException(rightUpstream.name());
}
// Get the next incoming record batch
rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
}
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
if (bloomFilter2buildId.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
runtimeFilterReporter.sendOut(bloomFilters, probeFields,
runtimeFilterDef, hashJoinOpId);
}
}
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
if (numPartitions > 1) { // a single partition needs no completion
for (HashPartition partn : partitions) {
partn.completeAnInnerBatch(false, partn.isSpilled());
}
}
prefetchFirstProbeBatch();
if (leftUpstream.isError()) {
// A termination condition was reached while prefetching the first build
// side data holding batch.
// We need to terminate.
return leftUpstream;
}
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc
.next();
postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
// Traverse all the in-memory partitions' incoming batches, and build their
// hash tables
for (int index = 0; index < partitions.length; index++) {
HashPartition partn = partitions[index];
if (partn.isSpilled()) {
// Don't build hash tables for spilled partitions
continue;
}
try {
if (postBuildCalc.shouldSpill()) {
// Spill this partition if we need to make room
partn.spillThisPartition();
} else {
// Only build hash tables for partitions that are not spilled
partn.buildContainersHashTableAndHelper();
}
} catch (OutOfMemoryException e) {
String message = "Failed building hash table on partition " + index
+ ":\n" + makeDebugString() + "\n"
+ postBuildCalc.makeDebugString();
// Include debug info
throw new OutOfMemoryException(message, e);
}
}
if (logger.isDebugEnabled()) {
logger.debug(postBuildCalc.makeDebugString());
}
for (HashPartition partn : partitions) {
if (partn.isSpilled()) {
SpilledPartition sp = new SpilledPartition(
spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
partn.getPartitionBatchesCount(), partn.getSpillFile());
spilledState.addPartition(sp);
spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
// the SP later
partn.closeWriter();
partn.updateProbeRecordsPerBatch(
postBuildCalc.getProbeRecordsPerBatch());
}
}
return null;
}