in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java [242:279]
public boolean hasNext() {
if (hasNext) {
return true;
}
if (cannotContinueDecoding) {
return false;
}
while (true) {
try {
if (decoder == null) {
if (inputStreams.hasNext()) {
serializedCountingStream = new CountingInputStream(inputStreams.next());
encodedCountingStream = new CountingInputStream(buildInputStream(
serializedCountingStream, serializer.getDecodeStreamChainers()));
decoder = serializer.getDecoderFactory().create(encodedCountingStream);
} else {
cannotContinueDecoding = true;
return false;
}
}
} catch (final IOException e) {
// We cannot recover IOException thrown by buildInputStream.
throw new RuntimeException(e);
}
try {
next = decoder.decode();
hasNext = true;
return true;
} catch (final IOException e) {
// IOException from decoder indicates EOF event.
numSerializedBytes += serializedCountingStream.getCount();
numEncodedBytes += encodedCountingStream.getCount();
serializedCountingStream = null;
encodedCountingStream = null;
decoder = null;
}
}
}