client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java [63:217]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleInputGate extends IndexedInputGate {

  private final RemoteShuffleInputGateDelegation inputGateDelegation;

  public RemoteShuffleInputGate(
      CelebornConf celebornConf,
      String taskName,
      int gateIndex,
      InputGateDeploymentDescriptor gateDescriptor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      BufferDecompressor bufferDecompressor,
      int numConcurrentReading) {

    inputGateDelegation =
        new RemoteShuffleInputGateDelegation(
            celebornConf,
            taskName,
            gateIndex,
            gateDescriptor,
            bufferPoolFactory,
            bufferDecompressor,
            numConcurrentReading,
            availabilityHelper,
            gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
            gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
  }

  /** Setup gate and build network connections. */
  @Override
  public void setup() throws IOException {
    inputGateDelegation.setup();
  }

  /** Index of the gate of the corresponding computing task. */
  @Override
  public int getGateIndex() {
    return inputGateDelegation.getGateIndex();
  }

  /** Get number of input channels. A channel is a data flow from one shuffle worker. */
  @Override
  public int getNumberOfInputChannels() {
    return inputGateDelegation.getBufferReaders().size();
  }

  /** Whether reading is finished -- all channels are finished and cached buffers are drained. */
  @Override
  public boolean isFinished() {
    return inputGateDelegation.isFinished();
  }

  @Override
  public Optional<BufferOrEvent> getNext() {
    throw new UnsupportedOperationException("Not implemented (DataSet API is not supported).");
  }

  /** Poll a received {@link BufferOrEvent}. */
  @Override
  public Optional<BufferOrEvent> pollNext() throws IOException {
    return inputGateDelegation.pollNext();
  }

  /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
  @Override
  public void close() throws Exception {
    inputGateDelegation.close();
  }

  /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
  @Override
  public List<InputChannelInfo> getChannelInfos() {
    return inputGateDelegation.getChannelsInfo();
  }

  @Override
  public void requestPartitions() {
    // do-nothing
  }

  @Override
  public void checkpointStarted(CheckpointBarrier barrier) {
    // do-nothing.
  }

  @Override
  public void checkpointStopped(long cancelledCheckpointId) {
    // do-nothing.
  }

  @Override
  public void triggerDebloating() {
    // do-nothing.
  }

  @Override
  public List<InputChannelInfo> getUnfinishedChannels() {
    return Collections.emptyList();
  }

  @Override
  public EndOfDataStatus hasReceivedEndOfData() {
    if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
      return EndOfDataStatus.NOT_END_OF_DATA;
    } else {
      // Keep compatibility with streaming mode.
      return EndOfDataStatus.DRAINED;
    }
  }

  @Override
  public void finishReadRecoveredState() {
    // do-nothing.
  }

  @Override
  public InputChannel getChannel(int channelIndex) {
    return new FakedRemoteInputChannel(channelIndex);
  }

  @Override
  public void sendTaskEvent(TaskEvent event) {
    throw new FlinkRuntimeException("Method should not be called.");
  }

  @Override
  public void resumeConsumption(InputChannelInfo channelInfo) {
    throw new FlinkRuntimeException("Method should not be called.");
  }

  @Override
  public void acknowledgeAllRecordsProcessed(InputChannelInfo inputChannelInfo) {}

  @Override
  public CompletableFuture<Void> getStateConsumedFuture() {
    return CompletableFuture.completedFuture(null);
  }

  @Override
  public String toString() {
    return String.format(
        "ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
        inputGateDelegation.getTaskName(),
        inputGateDelegation.getGateIndex(),
        inputGateDelegation.getGateDescriptor().toString());
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private class FakedRemoteInputChannel extends RemoteInputChannel {
    FakedRemoteInputChannel(int channelIndex) {
      super(
          new SingleInputGate(
              "",
              inputGateDelegation.getGateIndex(),
              new IntermediateDataSetID(),
              ResultPartitionType.BLOCKING,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.17/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java [63:217]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleInputGate extends IndexedInputGate {

  private final RemoteShuffleInputGateDelegation inputGateDelegation;

  public RemoteShuffleInputGate(
      CelebornConf celebornConf,
      String taskName,
      int gateIndex,
      InputGateDeploymentDescriptor gateDescriptor,
      SupplierWithException<BufferPool, IOException> bufferPoolFactory,
      BufferDecompressor bufferDecompressor,
      int numConcurrentReading) {

    inputGateDelegation =
        new RemoteShuffleInputGateDelegation(
            celebornConf,
            taskName,
            gateIndex,
            gateDescriptor,
            bufferPoolFactory,
            bufferDecompressor,
            numConcurrentReading,
            availabilityHelper,
            gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
            gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
  }

  /** Setup gate and build network connections. */
  @Override
  public void setup() throws IOException {
    inputGateDelegation.setup();
  }

  /** Index of the gate of the corresponding computing task. */
  @Override
  public int getGateIndex() {
    return inputGateDelegation.getGateIndex();
  }

  /** Get number of input channels. A channel is a data flow from one shuffle worker. */
  @Override
  public int getNumberOfInputChannels() {
    return inputGateDelegation.getBufferReaders().size();
  }

  /** Whether reading is finished -- all channels are finished and cached buffers are drained. */
  @Override
  public boolean isFinished() {
    return inputGateDelegation.isFinished();
  }

  @Override
  public Optional<BufferOrEvent> getNext() {
    throw new UnsupportedOperationException("Not implemented (DataSet API is not supported).");
  }

  /** Poll a received {@link BufferOrEvent}. */
  @Override
  public Optional<BufferOrEvent> pollNext() throws IOException {
    return inputGateDelegation.pollNext();
  }

  /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
  @Override
  public void close() throws Exception {
    inputGateDelegation.close();
  }

  /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
  @Override
  public List<InputChannelInfo> getChannelInfos() {
    return inputGateDelegation.getChannelsInfo();
  }

  @Override
  public void requestPartitions() {
    // do-nothing
  }

  @Override
  public void checkpointStarted(CheckpointBarrier barrier) {
    // do-nothing.
  }

  @Override
  public void checkpointStopped(long cancelledCheckpointId) {
    // do-nothing.
  }

  @Override
  public void triggerDebloating() {
    // do-nothing.
  }

  @Override
  public List<InputChannelInfo> getUnfinishedChannels() {
    return Collections.emptyList();
  }

  @Override
  public EndOfDataStatus hasReceivedEndOfData() {
    if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
      return EndOfDataStatus.NOT_END_OF_DATA;
    } else {
      // Keep compatibility with streaming mode.
      return EndOfDataStatus.DRAINED;
    }
  }

  @Override
  public void finishReadRecoveredState() {
    // do-nothing.
  }

  @Override
  public InputChannel getChannel(int channelIndex) {
    return new FakedRemoteInputChannel(channelIndex);
  }

  @Override
  public void sendTaskEvent(TaskEvent event) {
    throw new FlinkRuntimeException("Method should not be called.");
  }

  @Override
  public void resumeConsumption(InputChannelInfo channelInfo) {
    throw new FlinkRuntimeException("Method should not be called.");
  }

  @Override
  public void acknowledgeAllRecordsProcessed(InputChannelInfo inputChannelInfo) {}

  @Override
  public CompletableFuture<Void> getStateConsumedFuture() {
    return CompletableFuture.completedFuture(null);
  }

  @Override
  public String toString() {
    return String.format(
        "ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
        inputGateDelegation.getTaskName(),
        inputGateDelegation.getGateIndex(),
        inputGateDelegation.getGateDescriptor().toString());
  }

  /** Accommodation for the incompleteness of Flink pluggable shuffle service. */
  private class FakedRemoteInputChannel extends RemoteInputChannel {
    FakedRemoteInputChannel(int channelIndex) {
      super(
          new SingleInputGate(
              "",
              inputGateDelegation.getGateIndex(),
              new IntermediateDataSetID(),
              ResultPartitionType.BLOCKING,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



