client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java [59:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {

  public RemoteShuffleInputGate(
      CelebornConf celebornConf,
      String taskName,
      int gateIndex,
      InputGateDeploymentDescriptor gateDescriptor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      BufferDecompressor bufferDecompressor,
      int numConcurrentReading) {
    super(
        celebornConf,
        taskName,
        gateIndex,
        gateDescriptor,
        bufferPoolFactory,
        bufferDecompressor,
        numConcurrentReading);
  }

  @Override
  public InputChannel getChannel(int channelIndex) {
    return new FakedRemoteInputChannel(channelIndex);
  }

  @Override
  public Tuple2<Integer, Integer> getConsumedSubpartitionIndexRange(
      InputGateDeploymentDescriptor gateDescriptor) {
    IndexRange indexRange = gateDescriptor.getConsumedSubpartitionIndexRange();
    return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private class FakedRemoteInputChannel extends RemoteInputChannel {
    FakedRemoteInputChannel(int channelIndex) {
      // Flink 1.19.0
      // [FLINK-25055][network] Support listen and notify mechanism for partition request
      // [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel
      super(
          new SingleInputGate(
              inputGateDelegation.getTaskName(),
              inputGateDelegation.getGateIndex(),
              new IntermediateDataSetID(),
              ResultPartitionType.BLOCKING,
              1,
              (a, b, c) -> {},
              () -> null,
              null,
              new FakedMemorySegmentProvider(),
              0,
              new ThroughputCalculator(SystemClock.getInstance()),
              null),
          channelIndex,
          new ResultPartitionID(),
          new ResultSubpartitionIndexSet(new IndexRange(0, 0)),
          new ConnectionID(
              new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 1),
              0),
          new LocalConnectionManager(),
          0,
          0,
          0,
          0,
          new SimpleCounter(),
          new SimpleCounter(),
          new FakedChannelStateWriter());
    }
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private static class FakedMemorySegmentProvider implements MemorySegmentProvider {

    @Override
    public Collection<MemorySegment> requestUnpooledMemorySegments(int i) throws IOException {
      return null;
    }

    @Override
    public void recycleUnpooledMemorySegments(Collection<MemorySegment> collection)
        throws IOException {}
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private static class FakedChannelStateWriter implements ChannelStateWriter {

    @Override
    public void start(long cpId, CheckpointOptions checkpointOptions) {}

    @Override
    public void addInputData(
        long cpId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {}

    @Override
    public void addOutputData(
        long cpId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {}

    @Override
    public void addOutputDataFuture(
        long l,
        ResultSubpartitionInfo resultSubpartitionInfo,
        int i,
        CompletableFuture<List<Buffer>> completableFuture)
        throws IllegalArgumentException {}

    @Override
    public void finishInput(long checkpointId) {}

    @Override
    public void finishOutput(long checkpointId) {}

    @Override
    public void abort(long checkpointId, Throwable cause, boolean cleanup) {}

    @Override
    public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
      return null;
    }

    @Override
    public void close() {}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java [59:178]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {

  public RemoteShuffleInputGate(
      CelebornConf celebornConf,
      String taskName,
      int gateIndex,
      InputGateDeploymentDescriptor gateDescriptor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      BufferDecompressor bufferDecompressor,
      int numConcurrentReading) {
    super(
        celebornConf,
        taskName,
        gateIndex,
        gateDescriptor,
        bufferPoolFactory,
        bufferDecompressor,
        numConcurrentReading);
  }

  @Override
  public InputChannel getChannel(int channelIndex) {
    return new FakedRemoteInputChannel(channelIndex);
  }

  @Override
  public Tuple2<Integer, Integer> getConsumedSubpartitionIndexRange(
      InputGateDeploymentDescriptor gateDescriptor) {
    IndexRange indexRange = gateDescriptor.getConsumedSubpartitionIndexRange();
    return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private class FakedRemoteInputChannel extends RemoteInputChannel {
    FakedRemoteInputChannel(int channelIndex) {
      // Flink 1.19.0
      // [FLINK-25055][network] Support listen and notify mechanism for partition request
      // [FLINK-33743][runtime] Support consuming multiple subpartitions on a single channel
      super(
          new SingleInputGate(
              inputGateDelegation.getTaskName(),
              inputGateDelegation.getGateIndex(),
              new IntermediateDataSetID(),
              ResultPartitionType.BLOCKING,
              1,
              (a, b, c) -> {},
              () -> null,
              null,
              new FakedMemorySegmentProvider(),
              0,
              new ThroughputCalculator(SystemClock.getInstance()),
              null),
          channelIndex,
          new ResultPartitionID(),
          new ResultSubpartitionIndexSet(new IndexRange(0, 0)),
          new ConnectionID(
              new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 1),
              0),
          new LocalConnectionManager(),
          0,
          0,
          0,
          0,
          new SimpleCounter(),
          new SimpleCounter(),
          new FakedChannelStateWriter());
    }
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private static class FakedMemorySegmentProvider implements MemorySegmentProvider {

    @Override
    public Collection<MemorySegment> requestUnpooledMemorySegments(int i) throws IOException {
      return null;
    }

    @Override
    public void recycleUnpooledMemorySegments(Collection<MemorySegment> collection)
        throws IOException {}
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private static class FakedChannelStateWriter implements ChannelStateWriter {

    @Override
    public void start(long cpId, CheckpointOptions checkpointOptions) {}

    @Override
    public void addInputData(
        long cpId, InputChannelInfo info, int startSeqNum, CloseableIterator<Buffer> data) {}

    @Override
    public void addOutputData(
        long cpId, ResultSubpartitionInfo info, int startSeqNum, Buffer... data) {}

    @Override
    public void addOutputDataFuture(
        long l,
        ResultSubpartitionInfo resultSubpartitionInfo,
        int i,
        CompletableFuture<List<Buffer>> completableFuture)
        throws IllegalArgumentException {}

    @Override
    public void finishInput(long checkpointId) {}

    @Override
    public void finishOutput(long checkpointId) {}

    @Override
    public void abort(long checkpointId, Throwable cause, boolean cleanup) {}

    @Override
    public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
      return null;
    }

    @Override
    public void close() {}
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



