public IterOutcome innerNext()

in exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java [519:711]


  public IterOutcome innerNext() {
    if (wasKilled) {
      // We have received a cancel signal. We need to stop processing.
      cleanup();
      return IterOutcome.NONE;
    }

    prefetchFirstBuildBatch();

    if (rightUpstream.isError()) {
      // A termination condition was reached while prefetching the first build
      // side data holding batch.
      // We need to terminate.
      return rightUpstream;
    }

    boolean isExistException = false;

    try {
      /*
       * If we are here for the first time, execute the build phase of the hash
       * join and setup the run time generated class for the probe side
       */
      if (state == BatchState.FIRST) {
        // Build the hash table, using the build side record batches.
        IterOutcome buildExecuteTermination = executeBuildPhase();

        if (buildExecuteTermination != null) {
          // A termination condition was reached while executing the build
          // phase.
          // We need to terminate.
          return buildExecuteTermination;
        }

        buildComplete = true;

        if (isRowKeyJoin) {
          // discard the first left batch which was fetched by buildSchema, and
          // get the new one based on rowkey join
          leftUpstream = next(left);
        }

        // Update the hash table related stats for the operator
        updateStats();
      }

      // Try to probe and project, or recursively handle a spilled partition
      if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
          joinIsLeftOrFull) { // or if this is a left/full outer join

        prefetchFirstProbeBatch();

        if (leftUpstream.isError()
            || (leftUpstream == NONE && !joinIsRightOrFull)) {
          // A termination condition was reached while prefetching the first
          // probe side data holding batch.
          // We need to terminate.
          return leftUpstream;
        }

        if (!buildSideIsEmpty.booleanValue()
            || !probeSideIsEmpty.booleanValue()) {
          // Only allocate outgoing vectors and execute probing logic if there
          // is data

          if (state == BatchState.FIRST) {
            setupProbe();
          }

          // Allocate the memory for the vectors in the output container
          batchMemoryManager.allocateVectors(container);

          probe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());

          outputRecords = probe.probeAndProject();

          container.setValueCount(outputRecords);

          batchMemoryManager.updateOutgoingStats(outputRecords);
          RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this,
              getRecordBatchStatsContext());

          /*
           * We are here because of one the following 1. Completed processing of
           * all the records and we are done 2. We've filled up the outgoing
           * batch to the maximum and we need to return upstream Either case
           * build the output container's schema and return
           */
          if (outputRecords > 0 || state == BatchState.FIRST) {
            state = BatchState.NOT_FIRST;

            return IterOutcome.OK;
          }
        }

        // Free all partitions' in-memory data structures
        // (In case need to start processing spilled partitions)
        for (HashPartition partn : partitions) {
          partn.cleanup(false); // clean, but do not delete the spill files !!
        }

        //
        // (recursively) Handle the spilled partitions, if any
        //
        if (!buildSideIsEmpty.booleanValue()) {
          while (!spilledState.isEmpty()) { // "while" is only used for
                                            // skipping; see "continue" below

            // Get the next (previously) spilled partition to handle as incoming
            SpilledPartition currSp = spilledState
                .getNextSpilledPartition();

            // If the outer is empty (and it's not a right/full join) - try the
            // next spilled partition
            if (currSp.outerSpilledBatches == 0 && !joinIsRightOrFull) {
              continue;
            }

            // Create a BUILD-side "incoming" out of the inner spill file of
            // that partition
            buildBatch = new SpilledRecordBatch(currSp.innerSpillFile,
                currSp.innerSpilledBatches, context, buildSchema, oContext,
                spillSet);
            // The above ctor call also got the first batch; need to update the
            // outcome
            rightUpstream = ((SpilledRecordBatch) buildBatch)
                .getInitialOutcome();

            if (currSp.outerSpilledBatches > 0) {
              // Create a PROBE-side "incoming" out of the outer spill file of
              // that partition
              probeBatch = new SpilledRecordBatch(currSp.outerSpillFile,
                  currSp.outerSpilledBatches, context, probeSchema, oContext,
                  spillSet);
              // The above ctor call also got the first batch; need to update
              // the outcome
              leftUpstream = ((SpilledRecordBatch) probeBatch)
                  .getInitialOutcome();
            } else {
              probeBatch = left; // if no outer batch then reuse left - needed
                                 // for updateIncoming()
              leftUpstream = IterOutcome.NONE;
              probe.changeToFinalProbeState();
            }

            spilledState.updateCycle(stats, currSp, spilledStateUpdater);
            state = BatchState.FIRST; // TODO need to determine if this is still
                                      // necessary since
                                      // prefetchFirstBatchFromBothSides sets
                                      // this

            prefetchedBuild.setValue(false);
            prefetchedProbe.setValue(false);

            return innerNext(); // start processing the next spilled partition
                                // "recursively"
          }
        }

      } else {
        // Our build side is empty, we won't have any matches, clear the probe
        // side
        killAndDrainLeftUpstream();
      }

      // No more output records, clean up and return
      state = BatchState.DONE;

      cleanup();

      return IterOutcome.NONE;
    } catch (SchemaChangeException e) {
      throw UserException.schemaChangeError(e).build(logger);
    } catch (OutOfMemoryException oom) {
      isExistException = true;
      throw UserException.memoryError(oom).build(logger);
    } catch (Exception e) {
      //Internal catch OutOfMemoryException, resulting in throwing other exceptions or others
      isExistException = true;
      throw UserException.executionError(e).build(logger);
    } finally {
      boolean isReleaseBuildBatch = buildBatch != null && buildBatch instanceof SpilledRecordBatch;
      boolean isReleaseProbeBatch = probeBatch != null && probeBatch instanceof SpilledRecordBatch;
      //release buildBatch spill memory
      if (isExistException && isReleaseBuildBatch) {
        buildBatch.cancel();
      }
      //release probeBatch spill memory
      if (isExistException && isReleaseProbeBatch) {
        probeBatch.cancel();
      }
    }
  }