in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [327:455]
private void submitRetryPushMergedData(
PushState pushState,
int shuffleId,
int mapId,
int attemptId,
ArrayList<DataBatches.DataBatch> batches,
StatusCode cause,
Integer oldGroupedBatchId,
ReviveRequest[] reviveRequests,
int remainReviveTimes,
long reviveResponseDueTime) {
HashMap<Pair<String, String>, DataBatches> newDataBatchesMap = new HashMap<>();
ArrayList<DataBatches.DataBatch> reviveFailedBatchesMap = new ArrayList<>();
long reviveWaitTime = reviveResponseDueTime - System.currentTimeMillis();
final long delta = 50;
long accumulatedTime = 0;
int index = 0;
while (index < reviveRequests.length && accumulatedTime <= reviveWaitTime) {
ReviveRequest request = reviveRequests[index];
DataBatches.DataBatch batch = batches.get(index);
if (request.reviveStatus != StatusCode.REVIVE_INITIALIZED.getValue()) {
if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Revive for push merged data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {}.",
shuffleId,
mapId,
attemptId,
request.partitionId,
oldGroupedBatchId);
} else if (request.reviveStatus == StatusCode.SUCCESS.getValue()) {
PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(request.partitionId);
DataBatches newDataBatches =
newDataBatchesMap.computeIfAbsent(genAddressPair(newLoc), (s) -> new DataBatches());
newDataBatches.addDataBatch(newLoc, batch.batchId, batch.body);
} else {
if (remainReviveTimes > 0) {
reviveFailedBatchesMap.add(batch);
} else {
String errorMsg =
String.format(
"Revive failed while pushing merged for shuffle %d map %d attempt %d partition %d batch %d location %s.",
shuffleId, mapId, attemptId, request.partitionId, oldGroupedBatchId, batch.loc);
pushState.exception.compareAndSet(
null,
new CelebornIOException(
errorMsg,
new CelebornIOException(
cause
+ " then revive but "
+ request.reviveStatus
+ "("
+ Utils.toStatusCode(request.reviveStatus)
+ ")")));
return;
}
}
index++;
} else {
try {
Thread.sleep(delta);
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for Revive result!");
Thread.currentThread().interrupt();
}
accumulatedTime += delta;
}
}
for (int i = index; i < reviveRequests.length; i++) {
ReviveRequest request = reviveRequests[index];
DataBatches.DataBatch batch = batches.get(i);
if (remainReviveTimes > 0) {
reviveFailedBatchesMap.add(batch);
} else {
String errorMsg =
String.format(
"Revive failed while pushing merged for shuffle %d map %d attempt %d partition %d batch %d location %s.",
shuffleId, mapId, attemptId, request.partitionId, oldGroupedBatchId, batch.loc);
pushState.exception.compareAndSet(
null,
new CelebornIOException(
errorMsg,
new CelebornIOException(
cause
+ " then revive but "
+ request.reviveStatus
+ "("
+ Utils.toStatusCode(request.reviveStatus)
+ ")")));
return;
}
}
for (Map.Entry<Pair<String, String>, DataBatches> entry : newDataBatchesMap.entrySet()) {
Pair<String, String> addressPair = entry.getKey();
DataBatches newDataBatches = entry.getValue();
doPushMergedData(
addressPair,
shuffleId,
mapId,
attemptId,
newDataBatches.requireBatches(),
pushState,
remainReviveTimes);
}
if (reviveFailedBatchesMap.isEmpty()) {
pushState.removeBatch(oldGroupedBatchId, batches.get(0).loc.hostAndPushPort());
} else {
ReviveRequest[] requests =
addAndGetReviveRequests(shuffleId, mapId, attemptId, reviveFailedBatchesMap, cause);
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
pushState,
shuffleId,
mapId,
attemptId,
reviveFailedBatchesMap,
cause,
oldGroupedBatchId,
requests,
remainReviveTimes - 1,
System.currentTimeMillis()
+ conf.clientRpcRequestPartitionLocationRpcAskTimeout()
.duration()
.toMillis()));
}
}