client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java [86:122]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public CelebornChannelBufferReader(
      FlinkShuffleClientImpl client,
      ShuffleResourceDescriptor shuffleDescriptor,
      TieredStorageInputChannelId inputChannelId,
      int startSubIdx,
      int endSubIdx,
      BiConsumer<ByteBuf, TieredStorageSubpartitionId> dataListener,
      BiConsumer<Throwable, TieredStorageSubpartitionId> failureListener) {
    this.client = client;
    this.shuffleId = shuffleDescriptor.getShuffleId();
    this.partitionId = shuffleDescriptor.getPartitionId();
    this.inputChannelId = inputChannelId;
    this.subPartitionIndexStart = startSubIdx;
    this.subPartitionIndexEnd = endSubIdx;
    this.dataListener = dataListener;
    this.failureListener = failureListener;
    this.subPartitionRequiredSegmentIds = JavaUtils.newConcurrentHashMap();
    for (int subPartitionId = subPartitionIndexStart;
        subPartitionId <= subPartitionIndexEnd;
        subPartitionId++) {
      subPartitionRequiredSegmentIds.put(subPartitionId, -1);
    }
    this.messageConsumer =
        requestMessage -> {
          // Note that we need to use SubPartitionReadData because the isSegmentGranularityVisible
          // is set as true when opening stream
          if (requestMessage instanceof SubPartitionReadData) {
            dataReceived((SubPartitionReadData) requestMessage);
          } else if (requestMessage instanceof BacklogAnnouncement) {
            backlogReceived(((BacklogAnnouncement) requestMessage).getBacklog());
          } else if (requestMessage instanceof TransportableError) {
            errorReceived(((TransportableError) requestMessage).getErrorMessage());
          } else if (requestMessage instanceof BufferStreamEnd) {
            onStreamEnd((BufferStreamEnd) requestMessage);
          }
        };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java [86:122]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public CelebornChannelBufferReader(
      FlinkShuffleClientImpl client,
      ShuffleResourceDescriptor shuffleDescriptor,
      TieredStorageInputChannelId inputChannelId,
      int startSubIdx,
      int endSubIdx,
      BiConsumer<ByteBuf, TieredStorageSubpartitionId> dataListener,
      BiConsumer<Throwable, TieredStorageSubpartitionId> failureListener) {
    this.client = client;
    this.shuffleId = shuffleDescriptor.getShuffleId();
    this.partitionId = shuffleDescriptor.getPartitionId();
    this.inputChannelId = inputChannelId;
    this.subPartitionIndexStart = startSubIdx;
    this.subPartitionIndexEnd = endSubIdx;
    this.dataListener = dataListener;
    this.failureListener = failureListener;
    this.subPartitionRequiredSegmentIds = JavaUtils.newConcurrentHashMap();
    for (int subPartitionId = subPartitionIndexStart;
        subPartitionId <= subPartitionIndexEnd;
        subPartitionId++) {
      subPartitionRequiredSegmentIds.put(subPartitionId, -1);
    }
    this.messageConsumer =
        requestMessage -> {
          // Note that we need to use SubPartitionReadData because the isSegmentGranularityVisible
          // is set as true when opening stream
          if (requestMessage instanceof SubPartitionReadData) {
            dataReceived((SubPartitionReadData) requestMessage);
          } else if (requestMessage instanceof BacklogAnnouncement) {
            backlogReceived(((BacklogAnnouncement) requestMessage).getBacklog());
          } else if (requestMessage instanceof TransportableError) {
            errorReceived(((TransportableError) requestMessage).getErrorMessage());
          } else if (requestMessage instanceof BufferStreamEnd) {
            onStreamEnd((BufferStreamEnd) requestMessage);
          }
        };
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



