private boolean fillBuffer()

in client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java [749:870]


    private boolean fillBuffer() throws IOException {
      try {
        if (firstChunk && currentReader != null) {
          init();
          currentChunk = getNextChunk();
          firstChunk = false;
        }
        if (currentChunk == null) {
          close();
          return false;
        }

        PushFailedBatch failedBatch = new PushFailedBatch(-1, -1, -1);
        boolean hasData = false;
        while (currentChunk.isReadable() || moveToNextChunk()) {
          currentChunk.readBytes(sizeBuf);
          int mapId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET);
          int attemptId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 4);
          int batchId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
          int size = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);

          if (shuffleCompressionEnabled) {
            if (size > compressedBuf.length) {
              compressedBuf = new byte[size];
            }

            currentChunk.readBytes(compressedBuf, 0, size);
          } else {
            if (size > rawDataBuf.length) {
              rawDataBuf = new byte[size];
            }

            currentChunk.readBytes(rawDataBuf, 0, size);
          }

          // de-duplicate
          if (attemptId == attempts[mapId]) {
            if (readSkewPartitionWithoutMapRange) {
              Set<PushFailedBatch> failedBatchSet =
                  this.failedBatches.get(currentReader.getLocation().getUniqueId());
              if (null != failedBatchSet) {
                failedBatch.setMapId(mapId);
                failedBatch.setAttemptId(attemptId);
                failedBatch.setBatchId(batchId);
                if (failedBatchSet.contains(failedBatch)) {
                  logger.warn("Skip duplicated batch: {}.", failedBatch);
                  continue;
                }
              }
            }
            if (!batchesRead.containsKey(mapId)) {
              Set<Integer> batchSet = new HashSet<>();
              batchesRead.put(mapId, batchSet);
            }
            Set<Integer> batchSet = batchesRead.get(mapId);
            if (!batchSet.contains(batchId)) {
              batchSet.add(batchId);
              callback.incBytesRead(BATCH_HEADER_SIZE + size);
              if (shuffleCompressionEnabled) {
                // decompress data
                int originalLength = decompressor.getOriginalLen(compressedBuf);
                if (rawDataBuf.length < originalLength) {
                  rawDataBuf = new byte[originalLength];
                }
                limit = decompressor.decompress(compressedBuf, rawDataBuf, 0);
              } else {
                limit = size;
              }
              position = 0;
              hasData = true;
              break;
            } else {
              callback.incDuplicateBytesRead(BATCH_HEADER_SIZE + size);
              logger.debug(
                  "Skip duplicated batch: mapId {}, attemptId {}, batchId {}.",
                  mapId,
                  attemptId,
                  batchId);
            }
          }
        }

        return hasData;
      } catch (LZ4Exception | ZstdException | IOException e) {
        logger.error(
            "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
            appShuffleId,
            shuffleId,
            partitionId,
            Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
            e);
        IOException ioe;
        if (e instanceof IOException) {
          ioe = (IOException) e;
        } else {
          ioe = new IOException(e);
        }
        if (exceptionMaker != null) {
          if (shuffleClient.reportShuffleFetchFailure(appShuffleId, shuffleId, taskId)) {
            /*
             * [[ExceptionMaker.makeException]], for spark applications with celeborn.client.spark.fetch.throwsFetchFailure enabled will result in creating
             * a FetchFailedException; and that will make the TaskContext as failed with shuffle fetch issues - see SPARK-19276 for more.
             * Given this, Celeborn can wrap the FetchFailedException with our CelebornIOException
             */
            ioe =
                new CelebornIOException(
                    exceptionMaker.makeFetchFailureException(
                        appShuffleId, shuffleId, partitionId, e));
          }
        }
        throw ioe;
      } catch (Exception e) {
        logger.error(
            "Failed to fill buffer from chunk. AppShuffleId {}, shuffleId {}, partitionId {}, location {}",
            appShuffleId,
            shuffleId,
            partitionId,
            Optional.ofNullable(currentReader).map(PartitionReader::getLocation).orElse(null),
            e);
        throw e;
      }
    }