public void write()

in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [485:579]


    public void write(byte b[], int off, int len)
        throws IOException {
      // must always verify the invariant that at least METASIZE bytes are
      // available beyond kvindex, even when len == 0
      bufferRemaining -= len;
      if (bufferRemaining <= 0) {
        // writing these bytes could exhaust available buffer space or fill
        // the buffer to soft limit. check if spill or blocking are necessary
        boolean blockwrite = false;
        spillLock.lock();
        try {
          do {
            checkSpillException();

            final int kvbidx = 4 * kvindex;
            final int kvbend = 4 * kvend;
            // ser distance to key index
            final int distkvi = distanceTo(bufindex, kvbidx);
            // ser distance to spill end index
            final int distkve = distanceTo(bufindex, kvbend);

            // if kvindex is closer than kvend, then a spill is neither in
            // progress nor complete and reset since the lock was held. The
            // write should block only if there is insufficient space to
            // complete the current write, write the metadata for this record,
            // and write the metadata for the next record. If kvend is closer,
            // then the write should block if there is too little space for
            // either the metadata or the current write. Note that collect
            // ensures its metadata requirement with a zero-length write
            blockwrite = distkvi <= distkve
              ? distkvi <= len + 2 * METASIZE
              : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;

            if (!spillInProgress) {
              if (blockwrite) {
                if ((kvbend + METASIZE) % kvbuffer.length !=
                    equator - (equator % METASIZE)) {
                  // spill finished, reclaim space
                  // need to use meta exclusively; zero-len rec & 100% spill
                  // pcnt would fail
                  resetSpill(); // resetSpill doesn't move bufindex, kvindex
                  bufferRemaining = Math.min(
                      distkvi - 2 * METASIZE,
                      softLimit - distanceTo(kvbidx, bufindex)) - len;
                  continue;
                }
                // we have records we can spill; only spill if blocked
                if (kvindex != kvend) {
                  startSpill();
                  // Blocked on this write, waiting for the spill just
                  // initiated to finish. Instead of repositioning the marker
                  // and copying the partial record, we set the record start
                  // to be the new equator
                  setEquator(bufmark);
                } else {
                  // We have no buffered records, and this record is too large
                  // to write into kvbuffer. We must spill it directly from
                  // collect
                  final int size = distanceTo(bufstart, bufindex) + len;
                  setEquator(0);
                  bufstart = bufend = bufindex = equator;
                  kvstart = kvend = kvindex;
                  bufvoid = kvbuffer.length;
                  throw new MapBufferTooSmallException(size + " bytes");
                }
              }
            }

            if (blockwrite) {
              // wait for spill
              try {
                while (spillInProgress) {
                  spillDone.await();
                }
              } catch (InterruptedException e) {
                  throw new IOException(
                      "Buffer interrupted while waiting for the writer", e);
              }
            }
          } while (blockwrite);
        } finally {
          spillLock.unlock();
        }
      }
      // here, we know that we have sufficient space to write
      if (bufindex + len > bufvoid) {
        final int gaplen = bufvoid - bufindex;
        System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
        len -= gaplen;
        off += gaplen;
        bufindex = 0;
      }
      System.arraycopy(b, off, kvbuffer, bufindex, len);
      bufindex += len;
    }