private def internalCHWrite()

in backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala [83:177]


  private def internalCHWrite(records: Iterator[Product2[K, V]]): Unit = {
    val splitterJniWrapper: CHShuffleSplitterJniWrapper = jniWrapper
    if (!records.hasNext) {
      partitionLengths = new Array[Long](dep.partitioner.numPartitions)
      shuffleBlockResolver.writeMetadataFileAndCommit(
        dep.shuffleId,
        mapId,
        partitionLengths,
        Array[Long](),
        null)
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
      return
    }
    val dataTmp = Utils.tempFileWith(shuffleBlockResolver.getDataFile(dep.shuffleId, mapId))
    if (nativeSplitter == 0) {
      nativeSplitter = splitterJniWrapper.make(
        dep.nativePartitioning,
        dep.shuffleId,
        mapId,
        splitSize,
        customizedCompressCodec,
        dataTmp.getAbsolutePath,
        localDirs,
        subDirsPerLocalDir,
        preferSpill,
        spillThreshold,
        CHBackendSettings.shuffleHashAlgorithm,
        throwIfMemoryExceed,
        flushBlockBufferBeforeEvict
      )
      CHNativeMemoryAllocators.createSpillable(
        "ShuffleWriter",
        new Spiller() {
          override def spill(self: MemoryTarget, size: Long): Long = {
            if (nativeSplitter == 0) {
              throw new IllegalStateException(
                "Fatal: spill() called before a shuffle writer " +
                  "is created. This behavior should be optimized by moving memory " +
                  "allocations from make() to split()")
            }
            logInfo(s"Gluten shuffle writer: Trying to spill $size bytes of data")
            val spilled = splitterJniWrapper.evict(nativeSplitter);
            logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
            spilled
          }

          override def applicablePhases(): util.Set[Spiller.Phase] = Spillers.PHASE_SET_SPILL_ONLY
        }
      )
    }
    while (records.hasNext) {
      val cb = records.next()._2.asInstanceOf[ColumnarBatch]
      if (cb.numRows == 0 || cb.numCols == 0) {
        logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
      } else {
        firstRecordBatch = false
        val col = cb.column(0).asInstanceOf[CHColumnVector]
        val block = col.getBlockAddress
        splitterJniWrapper
          .split(nativeSplitter, block)
        dep.metrics("numInputRows").add(cb.numRows)
        dep.metrics("inputBatches").add(1)
        writeMetrics.incRecordsWritten(cb.numRows)
      }
    }
    splitResult = splitterJniWrapper.stop(nativeSplitter)

    dep.metrics("splitTime").add(splitResult.getSplitTime)
    dep.metrics("IOTime").add(splitResult.getDiskWriteTime)
    dep.metrics("serializeTime").add(splitResult.getSerializationTime)
    dep.metrics("spillTime").add(splitResult.getTotalSpillTime)
    dep.metrics("compressTime").add(splitResult.getTotalCompressTime)
    dep.metrics("computePidTime").add(splitResult.getTotalComputePidTime)
    dep.metrics("bytesSpilled").add(splitResult.getTotalBytesSpilled)
    dep.metrics("dataSize").add(splitResult.getTotalBytesWritten)
    writeMetrics.incBytesWritten(splitResult.getTotalBytesWritten)
    writeMetrics.incWriteTime(splitResult.getTotalWriteTime + splitResult.getTotalSpillTime)

    partitionLengths = splitResult.getPartitionLengths
    rawPartitionLengths = splitResult.getRawPartitionLengths
    try {
      shuffleBlockResolver.writeMetadataFileAndCommit(
        dep.shuffleId,
        mapId,
        partitionLengths,
        Array[Long](),
        dataTmp)
    } finally {
      if (dataTmp.exists() && !dataTmp.delete()) {
        logError(s"Error while deleting temp file ${dataTmp.getAbsolutePath}")
      }
    }

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
  }