protected void spill()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [882:991]


  protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCount)
      throws IOException, InterruptedException {

    //approximate the length of the output file to be the length of the
    //buffer + header lengths for the partitions
    final long size = (bufend >= bufstart
        ? bufend - bufstart
        : (bufvoid - bufend) + bufstart) +
                partitions * APPROX_HEADER_LENGTH;
    FSDataOutputStream out = null;
    try {
      // create spill file
      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
      final Path filename =
          mapOutputFile.getSpillFileForWrite(numSpills, size);
      spillFilePaths.put(numSpills, filename);
      out = rfs.create(filename);
      ensureSpillFilePermissions(filename, rfs);

      int spindex = mstart;
      final InMemValBytes value = createInMemValBytes();
      boolean rle = isRLENeeded(sameKeyCount, totalKeysCount);
      for (int i = 0; i < partitions; ++i) {
        IFile.Writer writer = null;
        try {
          long segmentStart = out.getPos();
          if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i
              || !sendEmptyPartitionDetails) {
            writer = new Writer(serializationContext.getKeySerialization(),
                serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
                serializationContext.getValueClass(), codec, spilledRecordsCounter, null, rle);
          }
          if (combiner == null) {
            // spill directly
            DataInputBuffer key = new DataInputBuffer();
            while (spindex < mend &&
                kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
              final int kvoff = offsetFor(spindex);
              int keystart = kvmeta.get(kvoff + KEYSTART);
              int valstart = kvmeta.get(kvoff + VALSTART);
              key.reset(kvbuffer, keystart, valstart - keystart);
              getVBytesForOffset(kvoff, value);
              writer.append(key, value);
              ++spindex;
            }
          } else {
            int spstart = spindex;
            while (spindex < mend &&
                kvmeta.get(offsetFor(spindex)
                          + PARTITION) == i) {
              ++spindex;
            }
            // Note: we would like to avoid the combiner if we've fewer
            // than some threshold of records for a partition
            if (spstart != spindex) {
              TezRawKeyValueIterator kvIter =
                new MRResultIterator(spstart, spindex);
              if (LOG.isDebugEnabled()) {
                LOG.debug(outputContext.getInputOutputVertexNames() + ": " + "Running combine processor");
              }
              runCombineProcessor(kvIter, writer);
            }
          }
          long rawLength = 0;
          long partLength = 0;
          // close the writer
          if (writer != null) {
            writer.close();
            rawLength = writer.getRawLength();
            partLength = writer.getCompressedLength();
          }
          adjustSpillCounters(rawLength, partLength);
          // record offsets
          final TezIndexRecord rec =
              new TezIndexRecord(segmentStart, rawLength, partLength);
          spillRec.putIndex(rec, i);
          if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) {
            partitionStats[i] += rawLength;
          }
          writer = null;
        } finally {
          if (null != writer) writer.close();
        }
      }

      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
        // create spill index file
        Path indexFilename =
            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
        spillFileIndexPaths.put(numSpills, indexFilename);
        spillRec.writeToFile(indexFilename, conf, localFs);
      } else {
        indexCacheList.add(spillRec);
        totalIndexCacheMemory +=
          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
      }
      LOG.info(outputContext.getInputOutputVertexNames() + ": " + "Finished spill " + numSpills
      + " at " + filename.toString());
      ++numSpills;
      if (!isFinalMergeEnabled()) {
        numShuffleChunks.setValue(numSpills);
      } else if (numSpills > 1) {
        //Increment only when there was atleast one previous spill
        numAdditionalSpills.increment(1);
      }
    } finally {
      if (out != null) out.close();
    }
  }