private void writeImpl()

in client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java [212:265]


  private void writeImpl(Iterator<Product2<K, V>> records) {
    List<ShuffleBlockInfo> shuffleBlockInfos;
    boolean isCombine = shuffleDependency.mapSideCombine();
    Function1<V, C> createCombiner = null;
    if (isCombine) {
      createCombiner = shuffleDependency.aggregator().get().createCombiner();
    }
    while (records.hasNext()) {
      // Task should fast fail when sending data failed
      checkIfBlocksFailed();

      Product2<K, V> record = records.next();
      K key = record._1();
      int partition = getPartition(key);
      if (isCombine) {
        Object c = createCombiner.apply(record._2());
        shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), c);
      } else {
        shuffleBlockInfos = bufferManager.addRecord(partition, record._1(), record._2());
      }
      if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
        processShuffleBlockInfos(shuffleBlockInfos);
      }
    }
    final long start = System.currentTimeMillis();
    shuffleBlockInfos = bufferManager.clear();
    if (shuffleBlockInfos != null && !shuffleBlockInfos.isEmpty()) {
      processShuffleBlockInfos(shuffleBlockInfos);
    }
    long checkStartTs = System.currentTimeMillis();
    checkBlockSendResult(blockIds);
    long commitStartTs = System.currentTimeMillis();
    long checkDuration = commitStartTs - checkStartTs;
    if (!isMemoryShuffleEnabled) {
      sendCommit();
    }
    long writeDurationMs = bufferManager.getWriteTime() + (System.currentTimeMillis() - start);
    shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
    LOG.info(
        "Finish write shuffle for appId["
            + appId
            + "], shuffleId["
            + shuffleId
            + "], taskId["
            + taskId
            + "] with write "
            + writeDurationMs
            + " ms, include checkSendResult["
            + checkDuration
            + "], commit["
            + (System.currentTimeMillis() - commitStartTs)
            + "], "
            + bufferManager.getManagerCostInfo());
  }