client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferManager.java [37:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class CelebornChannelBufferManager implements BufferListener, BufferRecycler {

  private static Logger logger = LoggerFactory.getLogger(CelebornChannelBufferManager.class);

  /** The queue to hold the available buffer when the reader is waiting for buffers. */
  private final Queue<Buffer> bufferQueue;

  private final TieredStorageMemoryManager memoryManager;

  private final CelebornChannelBufferReader bufferReader;

  /** The tag indicates whether it is waiting for buffers from the buffer pool. */
  @GuardedBy("bufferQueue")
  private boolean isWaitingForFloatingBuffers;

  /** The total number of required buffers for the respective input channel. */
  @GuardedBy("bufferQueue")
  private int numRequiredBuffers = 0;

  public CelebornChannelBufferManager(
      TieredStorageMemoryManager memoryManager, CelebornChannelBufferReader bufferReader) {
    this.memoryManager = checkNotNull(memoryManager);
    this.bufferReader = checkNotNull(bufferReader);
    this.bufferQueue = new LinkedList<>();
  }

  @Override
  public boolean notifyBufferAvailable(Buffer buffer) {
    if (bufferReader.isClosed()) {
      return false;
    }
    int numBuffers = 0;
    boolean isBufferUsed = false;
    try {
      synchronized (bufferQueue) {
        if (!isWaitingForFloatingBuffers) {
          logger.warn("This channel should be waiting for floating buffers.");
          return false;
        }
        isWaitingForFloatingBuffers = false;
        if (bufferReader.isClosed() || bufferQueue.size() >= numRequiredBuffers) {
          return false;
        }
        bufferQueue.add(buffer);
        isBufferUsed = true;
        numBuffers = 1 + tryRequestBuffers();
        decreaseRequiredCredits(numBuffers);
      }
      bufferReader.notifyAvailableCredits(numBuffers);
    } catch (Throwable t) {
      bufferReader.errorReceived(t.getLocalizedMessage());
    }
    return isBufferUsed;
  }

  public void decreaseRequiredCredits(int numCredits) {
    synchronized (bufferQueue) {
      numRequiredBuffers -= numCredits;
    }
  }

  @Override
  public void notifyBufferDestroyed() {
    // noop
  }

  @Override
  public void recycle(MemorySegment segment) {
    try {
      memoryManager.getBufferPool().recycle(segment);
    } catch (Throwable t) {
      ExceptionUtils.rethrow(t);
    }
  }

  Buffer requestBuffer() {
    synchronized (bufferQueue) {
      return bufferQueue.poll();
    }
  }

  int requestBuffers(int numRequired) {
    int numRequestedBuffers = 0;
    synchronized (bufferQueue) {
      if (bufferReader.isClosed()) {
        return numRequestedBuffers;
      }
      numRequiredBuffers += numRequired;
      numRequestedBuffers = tryRequestBuffers();
    }
    return numRequestedBuffers;
  }

  int tryRequestBuffersIfNeeded() {
    synchronized (bufferQueue) {
      if (numRequiredBuffers > 0 && !isWaitingForFloatingBuffers && bufferQueue.isEmpty()) {
        return tryRequestBuffers();
      }
      return 0;
    }
  }

  void close() {
    synchronized (bufferQueue) {
      for (Buffer buffer : bufferQueue) {
        buffer.recycleBuffer();
      }
      bufferQueue.clear();
    }
  }

  @GuardedBy("bufferQueue")
  private int tryRequestBuffers() {
    assert Thread.holdsLock(bufferQueue);
    int numRequestedBuffers = 0;
    while (bufferQueue.size() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
      BufferPool bufferPool = memoryManager.getBufferPool();
      Buffer buffer = bufferPool.requestBuffer();
      if (buffer != null) {
        bufferQueue.add(buffer);
        numRequestedBuffers++;
      } else if (bufferPool.addBufferListener(this)) {
        isWaitingForFloatingBuffers = true;
        break;
      }
    }
    return numRequestedBuffers;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferManager.java [37:163]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class CelebornChannelBufferManager implements BufferListener, BufferRecycler {

  private static Logger logger = LoggerFactory.getLogger(CelebornChannelBufferManager.class);

  /** The queue to hold the available buffer when the reader is waiting for buffers. */
  private final Queue<Buffer> bufferQueue;

  private final TieredStorageMemoryManager memoryManager;

  private final CelebornChannelBufferReader bufferReader;

  /** The tag indicates whether it is waiting for buffers from the buffer pool. */
  @GuardedBy("bufferQueue")
  private boolean isWaitingForFloatingBuffers;

  /** The total number of required buffers for the respective input channel. */
  @GuardedBy("bufferQueue")
  private int numRequiredBuffers = 0;

  public CelebornChannelBufferManager(
      TieredStorageMemoryManager memoryManager, CelebornChannelBufferReader bufferReader) {
    this.memoryManager = checkNotNull(memoryManager);
    this.bufferReader = checkNotNull(bufferReader);
    this.bufferQueue = new LinkedList<>();
  }

  @Override
  public boolean notifyBufferAvailable(Buffer buffer) {
    if (bufferReader.isClosed()) {
      return false;
    }
    int numBuffers = 0;
    boolean isBufferUsed = false;
    try {
      synchronized (bufferQueue) {
        if (!isWaitingForFloatingBuffers) {
          logger.warn("This channel should be waiting for floating buffers.");
          return false;
        }
        isWaitingForFloatingBuffers = false;
        if (bufferReader.isClosed() || bufferQueue.size() >= numRequiredBuffers) {
          return false;
        }
        bufferQueue.add(buffer);
        isBufferUsed = true;
        numBuffers = 1 + tryRequestBuffers();
        decreaseRequiredCredits(numBuffers);
      }
      bufferReader.notifyAvailableCredits(numBuffers);
    } catch (Throwable t) {
      bufferReader.errorReceived(t.getLocalizedMessage());
    }
    return isBufferUsed;
  }

  public void decreaseRequiredCredits(int numCredits) {
    synchronized (bufferQueue) {
      numRequiredBuffers -= numCredits;
    }
  }

  @Override
  public void notifyBufferDestroyed() {
    // noop
  }

  @Override
  public void recycle(MemorySegment segment) {
    try {
      memoryManager.getBufferPool().recycle(segment);
    } catch (Throwable t) {
      ExceptionUtils.rethrow(t);
    }
  }

  Buffer requestBuffer() {
    synchronized (bufferQueue) {
      return bufferQueue.poll();
    }
  }

  int requestBuffers(int numRequired) {
    int numRequestedBuffers = 0;
    synchronized (bufferQueue) {
      if (bufferReader.isClosed()) {
        return numRequestedBuffers;
      }
      numRequiredBuffers += numRequired;
      numRequestedBuffers = tryRequestBuffers();
    }
    return numRequestedBuffers;
  }

  int tryRequestBuffersIfNeeded() {
    synchronized (bufferQueue) {
      if (numRequiredBuffers > 0 && !isWaitingForFloatingBuffers && bufferQueue.isEmpty()) {
        return tryRequestBuffers();
      }
      return 0;
    }
  }

  void close() {
    synchronized (bufferQueue) {
      for (Buffer buffer : bufferQueue) {
        buffer.recycleBuffer();
      }
      bufferQueue.clear();
    }
  }

  @GuardedBy("bufferQueue")
  private int tryRequestBuffers() {
    assert Thread.holdsLock(bufferQueue);
    int numRequestedBuffers = 0;
    while (bufferQueue.size() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
      BufferPool bufferPool = memoryManager.getBufferPool();
      Buffer buffer = bufferPool.requestBuffer();
      if (buffer != null) {
        bufferQueue.add(buffer);
        numRequestedBuffers++;
      } else if (bufferPool.addBufferListener(this)) {
        isWaitingForFloatingBuffers = true;
        break;
      }
    }
    return numRequestedBuffers;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



