public IterOutcome executeBuildPhase()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java [981:1189]


  public IterOutcome executeBuildPhase() throws SchemaChangeException {
    if (buildSideIsEmpty.booleanValue()) {
      // empty right
      return null;
    }

    if (skipHashTableBuild) { // No hash table needed - then consume all the
                              // right upstream
      killAndDrainRightUpstream();
      return null;
    }

    HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;

    {
      // Initializing build calculator
      // Limit scope of these variables to this block
      int maxBatchSize = spilledState.isFirstCycle()
          ? RecordBatch.MAX_BATCH_ROW_COUNT
          : RECORDS_PER_BATCH;
      boolean doMemoryCalculation = canSpill
          && !probeSideIsEmpty.booleanValue();
      HashJoinMemoryCalculator calc = getCalculatorImpl();

      calc.initialize(doMemoryCalculation);
      buildCalc = calc.next();

      buildCalc.initialize(spilledState.isFirstCycle(), true, // TODO Fix after
                                                              // growing hash
                                                              // values bug
                                                              // fixed
          buildBatch, probeBatch, buildJoinColumns,
          probeSideIsEmpty.booleanValue(), allocator.getLimit(), numPartitions,
          RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize,
          batchMemoryManager.getOutputBatchSize(),
          HashTable.DEFAULT_LOAD_FACTOR);

      if (spilledState.isFirstCycle() && doMemoryCalculation) {
        // Do auto tuning
        buildCalc = partitionNumTuning(maxBatchSize, buildCalc);
      }
    }

    if (spilledState.isFirstCycle()) {
      // Do initial setup only on the first cycle
      delayedSetup();
    }

    initializeBuild();

    initializeRuntimeFilter();

    // Make the calculator aware of our partitions
    HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(
        partitions);
    buildCalc.setPartitionStatSet(partitionStatSet);

    boolean moreData = true;
    while (moreData) {
      switch (rightUpstream) {
      case NONE:
      case NOT_YET:
        moreData = false;
        continue;

      case OK_NEW_SCHEMA:
        if (!buildSchema.equals(buildBatch.getSchema())) {
          throw SchemaChangeException.schemaChanged(
            this.getClass().getSimpleName() + " does not support schema changes in build side.",
              buildSchema, buildBatch.getSchema());
        }
        for (HashPartition partn : partitions) {
          partn.updateBatches();
        }
        // Fall through
      case OK:
        batchMemoryManager.update(buildBatch, RIGHT_INDEX, 0, true);
        int currentRecordCount = buildBatch.getRecordCount();
        // create runtime filter
        if (spilledState.isFirstCycle() && enableRuntimeFilter) {
          // create runtime filter and send out async
          for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
            int fieldId = bloomFilter2buildId.get(bloomFilter);
            for (int ind = 0; ind < currentRecordCount; ind++) {
              long hashCode = hash64.hash64Code(ind, 0, fieldId);
              bloomFilter.insert(hashCode);
            }
          }
        }
        // Special treatment (when no spill, and single partition) -- use the
        // incoming vectors as they are (no row copy)
        if (numPartitions == 1) {
          partitions[0].appendBatch(buildBatch);
          break;
        }

        if (!spilledState.isFirstCycle()) {
          read_right_HV_vector = (IntVector) buildBatch.getContainer()
              .getLast();
        }

        // For every record in the build batch, hash the key columns and keep
        // the result
        for (int ind = 0; ind < currentRecordCount; ind++) {
          int hashCode = spilledState.isFirstCycle()
              ? partitions[0].getBuildHashCode(ind)
              : read_right_HV_vector.getAccessor().get(ind); // get the hash
                                                             // value from the
                                                             // HV column
          int currPart = hashCode & spilledState.getPartitionMask();
          hashCode >>>= spilledState.getBitsInMask();
          // Append the new inner row to the appropriate partition; spill (that
          // partition) if needed
          partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind,
              hashCode, buildCalc);
        }

        if (read_right_HV_vector != null) {
          read_right_HV_vector.clear();
          read_right_HV_vector = null;
        }
        break;
      default:
        throw new IllegalStateException(rightUpstream.name());
      }
      // Get the next incoming record batch
      rightUpstream = next(HashJoinHelper.RIGHT_INPUT, buildBatch);
    }

    if (spilledState.isFirstCycle() && enableRuntimeFilter) {
      if (bloomFilter2buildId.size() > 0) {
        int hashJoinOpId = this.popConfig.getOperatorId();
        runtimeFilterReporter.sendOut(bloomFilters, probeFields,
          runtimeFilterDef, hashJoinOpId);
      }
    }

    // Move the remaining current batches into their temp lists, or spill
    // them if the partition is spilled. Add the spilled partitions into
    // the spilled partitions list
    if (numPartitions > 1) { // a single partition needs no completion
      for (HashPartition partn : partitions) {
        partn.completeAnInnerBatch(false, partn.isSpilled());
      }
    }

    prefetchFirstProbeBatch();

    if (leftUpstream.isError()) {
      // A termination condition was reached while prefetching the first build
      // side data holding batch.
      // We need to terminate.
      return leftUpstream;
    }

    HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc
        .next();
    postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty

    // Traverse all the in-memory partitions' incoming batches, and build their
    // hash tables

    for (int index = 0; index < partitions.length; index++) {
      HashPartition partn = partitions[index];

      if (partn.isSpilled()) {
        // Don't build hash tables for spilled partitions
        continue;
      }

      try {
        if (postBuildCalc.shouldSpill()) {
          // Spill this partition if we need to make room
          partn.spillThisPartition();
        } else {
          // Only build hash tables for partitions that are not spilled
          partn.buildContainersHashTableAndHelper();
        }
      } catch (OutOfMemoryException e) {
        String message = "Failed building hash table on partition " + index
            + ":\n" + makeDebugString() + "\n"
            + postBuildCalc.makeDebugString();
        // Include debug info
        throw new OutOfMemoryException(message, e);
      }
    }

    if (logger.isDebugEnabled()) {
      logger.debug(postBuildCalc.makeDebugString());
    }

    for (HashPartition partn : partitions) {
      if (partn.isSpilled()) {
        SpilledPartition sp = new SpilledPartition(
            spilledState.getCycle(), partn.getPartitionNum(), originalPartition,
            partn.getPartitionBatchesCount(), partn.getSpillFile());

        spilledState.addPartition(sp);
        spilledInners[partn.getPartitionNum()] = sp; // for the outer to find
                                                     // the SP later
        partn.closeWriter();

        partn.updateProbeRecordsPerBatch(
            postBuildCalc.getProbeRecordsPerBatch());
      }
    }

    return null;
  }