in client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java [887:965]
public Option<MapStatus> stop(boolean success) {
try {
if (success) {
long start = System.currentTimeMillis();
shuffleWriteClient.reportShuffleResult(
serverToPartitionToBlockIds,
appId,
shuffleId,
taskAttemptId,
bitmapSplitNum,
recordReportFailedShuffleservers,
enableWriteFailureRetry);
long reportDuration = System.currentTimeMillis() - start;
LOG.info(
"Reported all shuffle result for shuffleId[{}] task[{}] with bitmapNum[{}] cost {} ms",
shuffleId,
taskAttemptId,
bitmapSplitNum,
reportDuration);
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
// todo: we can replace the dummy host and port with the real shuffle server which we prefer
// to read
final BlockManagerId blockManagerId =
BlockManagerId.apply(
appId + "_" + taskId,
DUMMY_HOST,
DUMMY_PORT,
Option.apply(Long.toString(taskAttemptId)));
MapStatus mapStatus = MapStatus.apply(blockManagerId, partitionLengths, taskAttemptId);
return Option.apply(mapStatus);
} else {
return Option.empty();
}
} catch (Exception e) {
// If an exception is thrown during the reporting process, it should be judged as a failure
// and Stage retry should be triggered.
if (enableWriteFailureRetry) {
throw throwFetchFailedIfNecessary(e, recordReportFailedShuffleservers);
} else {
throw e;
}
} finally {
// report shuffle write metrics to driver
if (managerClientSupplier != null) {
ShuffleManagerClient shuffleManagerClient = managerClientSupplier.get();
if (shuffleManagerClient != null) {
RssReportShuffleWriteMetricResponse response =
shuffleManagerClient.reportShuffleWriteMetric(
new RssReportShuffleWriteMetricRequest(
taskContext.stageId(),
shuffleId,
taskContext.taskAttemptId(),
bufferManager.getShuffleServerPushCostTracker().toMetric()));
if (response.getStatusCode() != StatusCode.SUCCESS) {
LOG.error("Errors on reporting shuffle write metrics to driver");
}
}
}
if (blockFailSentRetryEnabled) {
if (success) {
if (CollectionUtils.isNotEmpty(shuffleManager.getFailedBlockIds(taskId))) {
LOG.error(
"Errors on stopping writer due to the remaining failed blockIds. This should not happen.");
return Option.empty();
}
} else {
shuffleManager.getBlockIdsFailedSendTracker(taskId).clearAndReleaseBlockResources();
}
}
// free all memory & metadata, or memory leak happen in executor
if (bufferManager != null) {
bufferManager.freeAllMemory();
}
if (shuffleManager != null) {
shuffleManager.clearTaskMeta(taskId);
}
}
}