in client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java [749:870]
private boolean fillBuffer() throws IOException {
try {
if (firstChunk && currentReader != null) {
init();
currentChunk = getNextChunk();
firstChunk = false;
}
if (currentChunk == null) {
close();
return false;
}
PushFailedBatch failedBatch = new PushFailedBatch(-1, -1, -1);
boolean hasData = false;
while (currentChunk.isReadable() || moveToNextChunk()) {
currentChunk.readBytes(sizeBuf);
int mapId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET);
int attemptId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 4);
int batchId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
int size = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);
if (shuffleCompressionEnabled) {
if (size > compressedBuf.length) {
compressedBuf = new byte[size];
}
currentChunk.readBytes(compressedBuf, 0, size);
} else {
if (size > rawDataBuf.length) {
rawDataBuf = new byte[size];
}
currentChunk.readBytes(rawDataBuf, 0, size);
}
// de-duplicate
if (attemptId == attempts[mapId]) {
if (readSkewPartitionWithoutMapRange) {
Set<PushFailedBatch> failedBatchSet =
this.failedBatches.get(currentReader.getLocation().getUniqueId());
if (null != failedBatchSet) {
failedBatch.setMapId(mapId);
failedBatch.setAttemptId(attemptId);
failedBatch.setBatchId(batchId);
if (failedBatchSet.contains(failedBatch)) {
logger.warn("Skip duplicated batch: {}.", failedBatch);
continue;
}
}
}
if (!batchesRead.containsKey(mapId)) {
Set<Integer> batchSet = new HashSet<>();
batchesRead.put(mapId, batchSet);
}
Set<Integer> batchSet = batchesRead.get(mapId);
if (!batchSet.contains(batchId)) {
batchSet.add(batchId);
callback.incBytesRead(BATCH_HEADER_SIZE + size);
if (shuffleCompressionEnabled) {
// decompress data
int originalLength = decompressor.getOriginalLen(compressedBuf);
if (rawDataBuf.length < originalLength) {
rawDataBuf = new byte[originalLength];
}
limit = decompressor.decompress(compressedBuf, rawDataBuf, 0);
} else {
limit = size;
}
position = 0;
hasData = true;
break;
} else {
callback.incDuplicateBytesRead(BATCH_HEADER_SIZE + size);
logger.debug(
"Skip duplicated batch: mapId {}, attemptId {}, batchId {}.",
mapId,
attemptId,
batchId);
}
}
}
return hasData;
} catch (LZ4Exception | ZstdException | IOException e) {
logger.error(
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
e);
IOException ioe;
if (e instanceof IOException) {
ioe = (IOException) e;
} else {
ioe = new IOException(e);
}
if (exceptionMaker != null) {
if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, taskId)) {
/*
* [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
* a FetchFailedException; and that will make the TaskContext as failed with shuffle fetch issues - see SPARK-19276 for more.
* Given this, Celeborn can wrap the FetchFailedException with our CelebornIOException
*/
ioe =
new CelebornIOException(
exceptionMaker.makeFetchFailureException(
appShuffleId, shuffleId, partitionId, e));
}
}
throw ioe;
} catch (Exception e) {
logger.error(
"Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
appShuffleId,
shuffleId,
partitionId,
Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
e);
throw e;
}
}