public Option stop()

in client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java [887:965]


  public Option<MapStatus> stop(boolean success) {
    try {
      if (success) {
        long start = System.currentTimeMillis();
        shuffleWriteClient.reportShuffleResult(
            serverToPartitionToBlockIds,
            appId,
            shuffleId,
            taskAttemptId,
            bitmapSplitNum,
            recordReportFailedShuffleservers,
            enableWriteFailureRetry);
        long reportDuration = System.currentTimeMillis() - start;
        LOG.info(
            "Reported all shuffle result for shuffleId[{}] task[{}] with bitmapNum[{}] cost {} ms",
            shuffleId,
            taskAttemptId,
            bitmapSplitNum,
            reportDuration);
        shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
        // todo: we can replace the dummy host and port with the real shuffle server which we prefer
        // to read
        final BlockManagerId blockManagerId =
            BlockManagerId.apply(
                appId + "_" + taskId,
                DUMMY_HOST,
                DUMMY_PORT,
                Option.apply(Long.toString(taskAttemptId)));
        MapStatus mapStatus = MapStatus.apply(blockManagerId, partitionLengths, taskAttemptId);
        return Option.apply(mapStatus);
      } else {
        return Option.empty();
      }
    } catch (Exception e) {
      // If an exception is thrown during the reporting process, it should be judged as a failure
      // and Stage retry should be triggered.
      if (enableWriteFailureRetry) {
        throw throwFetchFailedIfNecessary(e, recordReportFailedShuffleservers);
      } else {
        throw e;
      }
    } finally {
      // report shuffle write metrics to driver
      if (managerClientSupplier != null) {
        ShuffleManagerClient shuffleManagerClient = managerClientSupplier.get();
        if (shuffleManagerClient != null) {
          RssReportShuffleWriteMetricResponse response =
              shuffleManagerClient.reportShuffleWriteMetric(
                  new RssReportShuffleWriteMetricRequest(
                      taskContext.stageId(),
                      shuffleId,
                      taskContext.taskAttemptId(),
                      bufferManager.getShuffleServerPushCostTracker().toMetric()));
          if (response.getStatusCode() != StatusCode.SUCCESS) {
            LOG.error("Errors on reporting shuffle write metrics to driver");
          }
        }
      }

      if (blockFailSentRetryEnabled) {
        if (success) {
          if (CollectionUtils.isNotEmpty(shuffleManager.getFailedBlockIds(taskId))) {
            LOG.error(
                "Errors on stopping writer due to the remaining failed blockIds. This should not happen.");
            return Option.empty();
          }
        } else {
          shuffleManager.getBlockIdsFailedSendTracker(taskId).clearAndReleaseBlockResources();
        }
      }
      // free all memory & metadata, or memory leak happen in executor
      if (bufferManager != null) {
        bufferManager.freeAllMemory();
      }
      if (shuffleManager != null) {
        shuffleManager.clearTaskMeta(taskId);
      }
    }
  }