client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferManager.java [64:90]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public boolean notifyBufferAvailable(Buffer buffer) {
    if (bufferReader.isClosed()) {
      return false;
    }
    int numBuffers = 0;
    boolean isBufferUsed = false;
    try {
      synchronized (bufferQueue) {
        if (!isWaitingForFloatingBuffers) {
          logger.warn("This channel should be waiting for floating buffers.");
          return false;
        }
        isWaitingForFloatingBuffers = false;
        if (bufferReader.isClosed() || bufferQueue.size() >= numRequiredBuffers) {
          return false;
        }
        bufferQueue.add(buffer);
        isBufferUsed = true;
        numBuffers = 1 + tryRequestBuffers();
        decreaseRequiredCredits(numBuffers);
      }
      bufferReader.notifyAvailableCredits(numBuffers);
    } catch (Throwable t) {
      bufferReader.errorReceived(t.getLocalizedMessage());
    }
    return isBufferUsed;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



client-flink/flink-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferManager.java [64:90]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  public boolean notifyBufferAvailable(Buffer buffer) {
    if (bufferReader.isClosed()) {
      return false;
    }
    int numBuffers = 0;
    boolean isBufferUsed = false;
    try {
      synchronized (bufferQueue) {
        if (!isWaitingForFloatingBuffers) {
          logger.warn("This channel should be waiting for floating buffers.");
          return false;
        }
        isWaitingForFloatingBuffers = false;
        if (bufferReader.isClosed() || bufferQueue.size() >= numRequiredBuffers) {
          return false;
        }
        bufferQueue.add(buffer);
        isBufferUsed = true;
        numBuffers = 1 + tryRequestBuffers();
        decreaseRequiredCredits(numBuffers);
      }
      bufferReader.notifyAvailableCredits(numBuffers);
    } catch (Throwable t) {
      bufferReader.errorReceived(t.getLocalizedMessage());
    }
    return isBufferUsed;
  }
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



