in client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java [199:289]
public int pushDataToLocation(
int shuffleId,
int mapId,
int attemptId,
int partitionId,
ByteBuf data,
PartitionLocation location,
Runnable closeCallBack)
throws IOException {
// mapKey
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
PushState pushState = getPushState(mapKey);
// increment batchId
final int nextBatchId = pushState.nextBatchId();
int totalLength = data.readableBytes();
data.markWriterIndex();
data.writerIndex(0);
data.writeInt(partitionId);
data.writeInt(attemptId);
data.writeInt(nextBatchId);
data.writeInt(totalLength - BATCH_HEADER_SIZE);
data.resetWriterIndex();
logger.debug(
"Do push data byteBuf size {} for app {} shuffle {} map {} attempt {} reduce {} batch {}.",
totalLength,
appUniqueId,
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId);
// check limit
limitMaxInFlight(mapKey, pushState, location.hostAndPushPort());
// add inFlight requests
pushState.addBatch(nextBatchId, location.hostAndPushPort());
// build PushData request
NettyManagedBuffer buffer = new NettyManagedBuffer(data);
final String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
PushData pushData = new PushData(PRIMARY_MODE, shuffleKey, location.getUniqueId(), buffer);
// build callback
RpcResponseCallback callback =
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
pushState.removeBatch(nextBatchId, location.hostAndPushPort());
logger.debug(
"Push data byteBuf to {} success for shuffle {} map {} attemptId {} batch {}.",
location.hostAndPushPort(),
shuffleId,
mapId,
attemptId,
nextBatchId);
}
@Override
public void onFailure(Throwable e) {
pushState.removeBatch(nextBatchId, location.hostAndPushPort());
if (pushState.exception.get() != null) {
return;
}
String errorMsg =
String.format(
"Push data byteBuf to %s failed for shuffle %d map %d attempt %d batch %d.",
location.hostAndPushPort(), shuffleId, mapId, attemptId, nextBatchId);
pushState.exception.compareAndSet(null, new CelebornIOException(errorMsg, e));
}
};
// do push data
try {
TransportClient client = createClientWaitingInFlightRequest(location, mapKey, pushState);
client.pushData(pushData, pushDataTimeout, callback, closeCallBack);
} catch (Exception e) {
logger.error(
"Exception raised while pushing data byteBuf for shuffle {} map {} attempt {} partitionId {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
nextBatchId,
location,
e);
callback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
}
return totalLength;
}