public int pushDataToLocation()

in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java [199:289]


  public int pushDataToLocation(
      int shuffleId,
      int mapId,
      int attemptId,
      int partitionId,
      ByteBuf data,
      PartitionLocation location,
      Runnable closeCallBack)
      throws IOException {
    // mapKey
    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);

    PushState pushState = getPushState(mapKey);

    // increment batchId
    final int nextBatchId = pushState.nextBatchId();
    int totalLength = data.readableBytes();
    data.markWriterIndex();
    data.writerIndex(0);
    data.writeInt(partitionId);
    data.writeInt(attemptId);
    data.writeInt(nextBatchId);
    data.writeInt(totalLength - BATCH_HEADER_SIZE);
    data.resetWriterIndex();
    logger.debug(
        "Do push data byteBuf size {} for app {} shuffle {} map {} attempt {} reduce {} batch {}.",
        totalLength,
        appUniqueId,
        shuffleId,
        mapId,
        attemptId,
        partitionId,
        nextBatchId);
    // check limit
    limitMaxInFlight(mapKey, pushState, location.hostAndPushPort());

    // add inFlight requests
    pushState.addBatch(nextBatchId, location.hostAndPushPort());

    // build PushData request
    NettyManagedBuffer buffer = new NettyManagedBuffer(data);
    final String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
    PushData pushData = new PushData(PRIMARY_MODE, shuffleKey, location.getUniqueId(), buffer);

    // build callback
    RpcResponseCallback callback =
        new RpcResponseCallback() {
          @Override
          public void onSuccess(ByteBuffer response) {
            pushState.removeBatch(nextBatchId, location.hostAndPushPort());
            logger.debug(
                "Push data byteBuf to {} success for shuffle {} map {} attemptId {} batch {}.",
                location.hostAndPushPort(),
                shuffleId,
                mapId,
                attemptId,
                nextBatchId);
          }

          @Override
          public void onFailure(Throwable e) {
            pushState.removeBatch(nextBatchId, location.hostAndPushPort());
            if (pushState.exception.get() != null) {
              return;
            }
            String errorMsg =
                String.format(
                    "Push data byteBuf to %s failed for shuffle %d map %d attempt %d batch %d.",
                    location.hostAndPushPort(), shuffleId, mapId, attemptId, nextBatchId);
            pushState.exception.compareAndSet(null, new CelebornIOException(errorMsg, e));
          }
        };
    // do push data
    try {
      TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
      client.pushData(pushData, pushDataTimeout, callback, closeCallBack);
    } catch (Exception e) {
      logger.error(
          "Exception raised while pushing data byteBuf for shuffle {} map {} attempt {} partitionId {} batch {} location {}.",
          shuffleId,
          mapId,
          attemptId,
          partitionId,
          nextBatchId,
          location,
          e);
      callback.onFailure(
          new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
    }
    return totalLength;
  }