in tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java [485:579]
public void write(byte b[], int off, int len)
throws IOException {
// must always verify the invariant that at least METASIZE bytes are
// available beyond kvindex, even when len == 0
bufferRemaining -= len;
if (bufferRemaining <= 0) {
// writing these bytes could exhaust available buffer space or fill
// the buffer to soft limit. check if spill or blocking are necessary
boolean blockwrite = false;
spillLock.lock();
try {
do {
checkSpillException();
final int kvbidx = 4 * kvindex;
final int kvbend = 4 * kvend;
// ser distance to key index
final int distkvi = distanceTo(bufindex, kvbidx);
// ser distance to spill end index
final int distkve = distanceTo(bufindex, kvbend);
// if kvindex is closer than kvend, then a spill is neither in
// progress nor complete and reset since the lock was held. The
// write should block only if there is insufficient space to
// complete the current write, write the metadata for this record,
// and write the metadata for the next record. If kvend is closer,
// then the write should block if there is too little space for
// either the metadata or the current write. Note that collect
// ensures its metadata requirement with a zero-length write
blockwrite = distkvi <= distkve
? distkvi <= len + 2 * METASIZE
: distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
if (!spillInProgress) {
if (blockwrite) {
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
// spill finished, reclaim space
// need to use meta exclusively; zero-len rec & 100% spill
// pcnt would fail
resetSpill(); // resetSpill doesn't move bufindex, kvindex
bufferRemaining = Math.min(
distkvi - 2 * METASIZE,
softLimit - distanceTo(kvbidx, bufindex)) - len;
continue;
}
// we have records we can spill; only spill if blocked
if (kvindex != kvend) {
startSpill();
// Blocked on this write, waiting for the spill just
// initiated to finish. Instead of repositioning the marker
// and copying the partial record, we set the record start
// to be the new equator
setEquator(bufmark);
} else {
// We have no buffered records, and this record is too large
// to write into kvbuffer. We must spill it directly from
// collect
final int size = distanceTo(bufstart, bufindex) + len;
setEquator(0);
bufstart = bufend = bufindex = equator;
kvstart = kvend = kvindex;
bufvoid = kvbuffer.length;
throw new MapBufferTooSmallException(size + " bytes");
}
}
}
if (blockwrite) {
// wait for spill
try {
while (spillInProgress) {
spillDone.await();
}
} catch (InterruptedException e) {
throw new IOException(
"Buffer interrupted while waiting for the writer", e);
}
}
} while (blockwrite);
} finally {
spillLock.unlock();
}
}
// here, we know that we have sufficient space to write
if (bufindex + len > bufvoid) {
final int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
off += gaplen;
bufindex = 0;
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
}