client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [493:519]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private BiConsumer<ByteBuf, TieredStorageSubpartitionId> getDataListener(
      TieredStoragePartitionId partitionId) {
    return (byteBuf, subPartitionId) -> {
      Queue<Buffer> unpackedBuffers = null;
      try {
        unpackedBuffers = ReceivedNoHeaderBufferPacker.unpack(byteBuf);
        while (!unpackedBuffers.isEmpty()) {
          onBuffer(partitionId, subPartitionId, unpackedBuffers.poll());
        }
      } catch (Throwable throwable) {
        synchronized (lock) {
          LOG.error(
              "Failed to process the received buffer, cause: {} throwable {}.",
              cause == null ? "" : cause,
              throwable);
          if (cause == null) {
            cause = throwable;
          }
        }
        notifyAvailable(partitionId, subPartitionId);
        if (unpackedBuffers != null) {
          unpackedBuffers.forEach(Buffer::recycleBuffer);
        }
        recycleAllResources();
      }
    };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornTierConsumerAgent.java [493:519]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private BiConsumer<ByteBuf, TieredStorageSubpartitionId> getDataListener(
      TieredStoragePartitionId partitionId) {
    return (byteBuf, subPartitionId) -> {
      Queue<Buffer> unpackedBuffers = null;
      try {
        unpackedBuffers = ReceivedNoHeaderBufferPacker.unpack(byteBuf);
        while (!unpackedBuffers.isEmpty()) {
          onBuffer(partitionId, subPartitionId, unpackedBuffers.poll());
        }
      } catch (Throwable throwable) {
        synchronized (lock) {
          LOG.error(
              "Failed to process the received buffer, cause: {} throwable {}.",
              cause == null ? "" : cause,
              throwable);
          if (cause == null) {
            cause = throwable;
          }
        }
        notifyAvailable(partitionId, subPartitionId);
        if (unpackedBuffers != null) {
          unpackedBuffers.forEach(Buffer::recycleBuffer);
        }
        recycleAllResources();
      }
    };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



