in client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java [215:307]
private void submitRetryPushData(
int shuffleId,
byte[] body,
int batchId,
PushDataRpcResponseCallback pushDataRpcResponseCallback,
PushState pushState,
ReviveRequest request,
int remainReviveTimes,
long dueTime) {
int mapId = request.mapId;
int attemptId = request.attemptId;
PartitionLocation loc = request.loc;
StatusCode cause = request.cause;
int partitionId = loc.getId();
long reviveWaitTime = dueTime - System.currentTimeMillis();
final long delta = 50;
long accumulatedTime = 0;
while (request.reviveStatus == StatusCode.REVIVE_INITIALIZED.getValue()
&& accumulatedTime <= reviveWaitTime) {
try {
Thread.sleep(delta);
accumulatedTime += delta;
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for Revive result!");
Thread.currentThread().interrupt();
}
}
if (mapperEnded(shuffleId, mapId)) {
logger.debug(
"Revive for push data success, but the mapper already ended for shuffle {} map {} attempt {} partition {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId,
loc);
pushState.removeBatch(batchId, loc.hostAndPushPort());
} else if (request.reviveStatus != StatusCode.SUCCESS.getValue()) {
pushDataRpcResponseCallback.onFailure(
new CelebornIOException(
cause
+ " then revive but "
+ StatusCode.REVIVE_FAILED
+ ", revive status "
+ request.reviveStatus
+ "("
+ Utils.toStatusCode(request.reviveStatus)
+ ")"
+ ", old location: "
+ request.loc));
} else {
PartitionLocation newLoc = reducePartitionMap.get(shuffleId).get(partitionId);
logger.info(
"Revive for push data success, new location for shuffle {} map {} attempt {} partition {} batch {} is location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId,
newLoc);
try {
if (!isPushTargetWorkerExcluded(newLoc, pushDataRpcResponseCallback)) {
if (!testRetryRevive || remainReviveTimes < 1) {
TransportClient client =
dataClientFactory.createClient(newLoc.getHost(), newLoc.getPushPort(), partitionId);
NettyManagedBuffer newBuffer = new NettyManagedBuffer(Unpooled.wrappedBuffer(body));
String shuffleKey = Utils.makeShuffleKey(appUniqueId, shuffleId);
PushData newPushData =
new PushData(PRIMARY_MODE, shuffleKey, newLoc.getUniqueId(), newBuffer);
pushDataRpcResponseCallback.updateLatestPartition(newLoc);
client.pushData(newPushData, pushDataTimeout, pushDataRpcResponseCallback);
} else {
throw new RuntimeException(
"Mock push data submit retry failed. remainReviveTimes = "
+ remainReviveTimes
+ ".");
}
}
} catch (Exception e) {
logger.error(
"Exception raised while pushing data for shuffle {} map {} attempt {} partition {} batch {} location {}.",
shuffleId,
mapId,
attemptId,
partitionId,
batchId,
newLoc,
e);
pushDataRpcResponseCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_PRIMARY, e));
}
}
}