protected void spill()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [726:824]


  protected void spill(int mstart, int mend)
      throws IOException, InterruptedException {

    //approximate the length of the output file to be the length of the
    //buffer + header lengths for the partitions
    final long size = (bufend >= bufstart
        ? bufend - bufstart
        : (bufvoid - bufend) + bufstart) +
                partitions * APPROX_HEADER_LENGTH;
    FSDataOutputStream out = null;
    try {
      // create spill file
      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
      final Path filename =
          mapOutputFile.getSpillFileForWrite(numSpills, size);
      out = rfs.create(filename);

      int spindex = mstart;
      final InMemValBytes value = createInMemValBytes();
      for (int i = 0; i < partitions; ++i) {
        IFile.Writer writer = null;
        try {
          long segmentStart = out.getPos();
          writer = new Writer(conf, out, keyClass, valClass, codec,
                                    spilledRecordsCounter, null);
          if (combiner == null) {
            // spill directly
            DataInputBuffer key = new DataInputBuffer();
            while (spindex < mend &&
                kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
              final int kvoff = offsetFor(spindex);
              int keystart = kvmeta.get(kvoff + KEYSTART);
              int valstart = kvmeta.get(kvoff + VALSTART);
              key.reset(kvbuffer, keystart, valstart - keystart);
              getVBytesForOffset(kvoff, value);
              writer.append(key, value);
              ++spindex;
            }
          } else {
            int spstart = spindex;
            while (spindex < mend &&
                kvmeta.get(offsetFor(spindex)
                          + PARTITION) == i) {
              ++spindex;
            }
            // Note: we would like to avoid the combiner if we've fewer
            // than some threshold of records for a partition
            if (spstart != spindex) {
              TezRawKeyValueIterator kvIter =
                new MRResultIterator(spstart, spindex);
              if (LOG.isDebugEnabled()) {
                LOG.debug("Running combine processor");
              }
              runCombineProcessor(kvIter, writer);
            }
          }

          // close the writer
          writer.close();
          if (numSpills > 0) {
            additionalSpillBytesWritten.increment(writer.getCompressedLength());
            numAdditionalSpills.increment(1);
            // Reset the value will be set during the final merge.
            outputBytesWithOverheadCounter.setValue(0);
          } else {
            // Set this up for the first write only. Subsequent ones will be handled in the final merge.
            outputBytesWithOverheadCounter.increment(writer.getRawLength());
          }
          // record offsets
          final TezIndexRecord rec =
              new TezIndexRecord(
                  segmentStart,
                  writer.getRawLength(),
                  writer.getCompressedLength());
          spillRec.putIndex(rec, i);

          writer = null;
        } finally {
          if (null != writer) writer.close();
        }
      }

      if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
        // create spill index file
        Path indexFilename =
            mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                * MAP_OUTPUT_INDEX_RECORD_LENGTH);
        spillRec.writeToFile(indexFilename, conf);
      } else {
        indexCacheList.add(spillRec);
        totalIndexCacheMemory +=
          spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
      }
      LOG.info("Finished spill " + numSpills);
      ++numSpills;
    } finally {
      if (out != null) out.close();
    }
  }