in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [1223:1470]
private void doPushMergedData(
Pair<String, String> addressPair,
int shuffleId,
int mapId,
int attemptId,
ArrayList<DataBatches.DataBatch> batches,
PushState pushState,
int remainReviveTimes) {
String hostPort = addressPair.getLeft();
final String[] splits = hostPort.split(":");
final String host = splits[0];
final int port = Integer.parseInt(splits[1]);
int groupedBatchId = pushState.nextBatchId();
pushState.addBatch(groupedBatchId, hostPort);
final int numBatches = batches.size();
final Integer[] partitionIds = new Integer[numBatches];
final String[] partitionUniqueIds = new String[numBatches];
final int[] offsets = new int[numBatches];
final int[] batchIds = new int[numBatches];
int currentSize = 0;
CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
for (int i = 0; i < numBatches; i++) {
DataBatches.DataBatch batch = batches.get(i);
partitionIds[i] = batch.loc.getId();
partitionUniqueIds[i] = batch.loc.getUniqueId();
offsets[i] = currentSize;
batchIds[i] = batch.batchId;
currentSize += batch.body.length;
byteBuf.addComponent(true, Unpooled.wrappedBuffer(batch.body));
}
NettyManagedBuffer buffer = new NettyManagedBuffer(byteBuf);
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
PushMergedData mergedData =
new PushMergedData(PRIMARY_MODE, shuffleKey, partitionUniqueIds, offsets, buffer);
RpcResponseCallback callback =
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.debug(
"Push merged data to {} success for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.removeBatch(groupedBatchId, hostPort);
if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
stageEndShuffleSet.add(shuffleId);
}
}
@Override
public void onFailure(Throwable e) {
String errorMsg =
String.format(
"Push merged data to %s failed for shuffle %d map %d attempt %d partition %s groupedBatch %d batch %s, remain revive times %d.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
remainReviveTimes);
pushState.exception.compareAndSet(null, new CelebornIOException(errorMsg, e));
if (logger.isDebugEnabled()) {
for (int i = 0; i < numBatches; i++) {
logger.debug(
"Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}, remain revive times {}.",
addressPair,
shuffleId,
mapId,
attemptId,
partitionIds[i],
groupedBatchId,
batchIds[i],
remainReviveTimes);
}
}
}
};
RpcResponseCallback wrappedCallback =
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
if (response.remaining() > 0) {
byte reason = response.get();
if (reason == StatusCode.HARD_SPLIT.getValue()) {
logger.info(
"Push merged data to {} hard split required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
ReviveRequest[] requests =
addAndGetReviveRequests(
shuffleId, mapId, attemptId, batches, StatusCode.HARD_SPLIT);
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
pushState,
shuffleId,
mapId,
attemptId,
batches,
StatusCode.HARD_SPLIT,
groupedBatchId,
requests,
remainReviveTimes,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
.duration()
.toMillis()));
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
logger.debug(
"Push merged data to {} primary congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.onCongestControl(hostPort);
callback.onSuccess(response);
} else if (reason == StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
logger.debug(
"Push merged data to {} replica congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds));
pushState.onCongestControl(hostPort);
callback.onSuccess(response);
} else {
// StageEnd.
response.rewind();
pushState.onSuccess(hostPort);
callback.onSuccess(response);
}
} else {
pushState.onSuccess(hostPort);
callback.onSuccess(response);
}
}
@Override
public void onFailure(Throwable e) {
StatusCode cause = getPushDataFailCause(e.getMessage());
if (pushState.exception.get() != null) {
return;
}
if (remainReviveTimes <= 0) {
if (e instanceof CelebornIOException) {
callback.onFailure(e);
} else {
callback.onFailure(new CelebornIOException(cause, e));
}
return;
}
logger.error(
"Push merged data to {} failed for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}, remain revive times {}.",
addressPair,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
remainReviveTimes,
e);
if (!mapperEnded(shuffleId, mapId)) {
ReviveRequest[] requests =
addAndGetReviveRequests(shuffleId, mapId, attemptId, batches, cause);
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
pushState,
shuffleId,
mapId,
attemptId,
batches,
cause,
groupedBatchId,
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
.duration()
.toMillis()));
} else {
pushState.removeBatch(groupedBatchId, hostPort);
logger.info(
"Push merged data to {} failed but mapper already ended for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}, remain revive times {}.",
hostPort,
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
remainReviveTimes);
}
}
};
// do push merged data
try {
if (!isPushTargetWorkerExcluded(batches.get(0).loc, wrappedCallback)) {
if (!testRetryRevive || remainReviveTimes < 1) {
TransportClient client = dataClientFactory.createClient(host, port);
client.pushMergedData(mergedData, pushDataTimeout, wrappedCallback);
} else {
wrappedCallback.onFailure(
new CelebornIOException(
StatusCode.PUSH_DATA_FAIL_NON_CRITICAL_CAUSE,
new RuntimeException("Mock push merge data failed.")));
}
}
} catch (Exception e) {
logger.error(
"Exception raised while pushing merged data for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
Arrays.toString(partitionIds),
groupedBatchId,
Arrays.toString(batchIds),
addressPair,
e);
wrappedCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
}
}