client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java [36:79]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleEnvironment extends AbstractRemoteShuffleEnvironment
    implements ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> {

  /** Factory class to create {@link RemoteShuffleResultPartition}. */
  private final RemoteShuffleResultPartitionFactory resultPartitionFactory;

  private final RemoteShuffleInputGateFactory inputGateFactory;

  /**
   * @param networkBufferPool Network buffer pool for shuffle read and shuffle write.
   * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
   * @param resultPartitionFactory Factory class to create {@link RemoteShuffleResultPartition}. //
   *     * @param inputGateFactory Factory class to create {@link RemoteShuffleInputGate}.
   */
  public RemoteShuffleEnvironment(
      NetworkBufferPool networkBufferPool,
      ResultPartitionManager resultPartitionManager,
      RemoteShuffleResultPartitionFactory resultPartitionFactory,
      RemoteShuffleInputGateFactory inputGateFactory,
      CelebornConf conf) {
    super(networkBufferPool, resultPartitionManager, conf);
    this.resultPartitionFactory = resultPartitionFactory;
    this.inputGateFactory = inputGateFactory;
  }

  @Override
  public ResultPartitionWriter createResultPartitionWriterInternal(
      ShuffleIOOwnerContext ownerContext,
      int index,
      ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor,
      CelebornConf conf) {
    return resultPartitionFactory.create(
        ownerContext.getOwnerName(), index, resultPartitionDeploymentDescriptor, conf);
  }

  @Override
  IndexedInputGate createInputGateInternal(
      ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) {
    return inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, igdd);
  }

  @VisibleForTesting
  RemoteShuffleResultPartitionFactory getResultPartitionFactory() {
    return resultPartitionFactory;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.15/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java [36:80]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class RemoteShuffleEnvironment extends AbstractRemoteShuffleEnvironment
    implements ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> {

  /** Factory class to create {@link RemoteShuffleResultPartition}. */
  private final RemoteShuffleResultPartitionFactory resultPartitionFactory;

  private final RemoteShuffleInputGateFactory inputGateFactory;

  /**
   * @param networkBufferPool Network buffer pool for shuffle read and shuffle write.
   * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
   * @param resultPartitionFactory Factory class to create {@link RemoteShuffleResultPartition}. //
   *     * @param inputGateFactory Factory class to create {@link RemoteShuffleInputGate}.
   */
  public RemoteShuffleEnvironment(
      NetworkBufferPool networkBufferPool,
      ResultPartitionManager resultPartitionManager,
      RemoteShuffleResultPartitionFactory resultPartitionFactory,
      RemoteShuffleInputGateFactory inputGateFactory,
      CelebornConf conf) {

    super(networkBufferPool, resultPartitionManager, conf);
    this.resultPartitionFactory = resultPartitionFactory;
    this.inputGateFactory = inputGateFactory;
  }

  @Override
  public ResultPartitionWriter createResultPartitionWriterInternal(
      ShuffleIOOwnerContext ownerContext,
      int index,
      ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor,
      CelebornConf conf) {
    return resultPartitionFactory.create(
        ownerContext.getOwnerName(), index, resultPartitionDeploymentDescriptor, conf);
  }

  @Override
  IndexedInputGate createInputGateInternal(
      ShuffleIOOwnerContext ownerContext, int gateIndex, InputGateDeploymentDescriptor igdd) {
    return inputGateFactory.create(ownerContext.getOwnerName(), gateIndex, igdd);
  }

  @VisibleForTesting
  RemoteShuffleResultPartitionFactory getResultPartitionFactory() {
    return resultPartitionFactory;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



