private def createNonShardingReadRDD()

in src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala [185:264]


  private def createNonShardingReadRDD(readFile: PartitionedFile => Iterator[InternalRow],
                                       selectedPartitions: Seq[PartitionDirectory],
                                       fsRelation: HadoopFsRelation): RDD[InternalRow] = {

    val defaultMaxSplitBytes =
      fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes

    val maxSplitBytes = if (KylinConfig.getInstanceFromEnv.readSourceWithDefaultParallelism()) {
      val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
      val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
      val bytesPerCore = totalBytes / defaultParallelism

      Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    } else {
      defaultMaxSplitBytes
    }
    logInfo(s"Planning scan with bin packing, max size is: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

    val splitFiles = selectedPartitions.flatMap { partition =>
      partition.files.flatMap { file =>
        val blockLocations = getBlockLocations(file)
        if (fsRelation.fileFormat.isSplitable(
          fsRelation.sparkSession, fsRelation.options, file.getPath)) {
          (0L until file.getLen by maxSplitBytes).map { offset =>
            val remaining = file.getLen - offset
            val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
            val hosts = getBlockHosts(blockLocations, offset, size)
            PartitionedFile(
              partition.values, file.getPath.toUri.toString, offset, size, hosts)
          }
        } else {
          val hosts = getBlockHosts(blockLocations, 0, file.getLen)
          Seq(PartitionedFile(
            partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
        }
      }
    }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

    val partitions = new ArrayBuffer[FilePartition]
    val currentFiles = new ArrayBuffer[PartitionedFile]
    var currentSize = 0L

    /** Close the current partition and move to the next. */
    def closePartition(): Unit = {
      if (currentFiles.nonEmpty) {
        val newPartition =
          FilePartition(
            partitions.size,
            currentFiles.toArray) // Copy to a new Array.
        partitions += newPartition
      }
      currentFiles.clear()
      currentSize = 0
    }

    // Assign files to partitions using "Next Fit Decreasing"
    splitFiles.foreach { file =>
      if (currentSize + file.length > maxSplitBytes) {
        closePartition()
      }
      // Add the given file to the current partition.
      currentSize += file.length + openCostInBytes
      currentFiles += file
    }
    closePartition()

    if (SoftAffinityManager.usingSoftAffinity) {
      val start = System.currentTimeMillis()
      val cachePartitions = partitions.map(CacheFilePartition.convertFilePartitionToCache)
      logInfo(s"Convert file partition took: ${System.currentTimeMillis() - start}")
      new CacheFileScanRDD(fsRelation.sparkSession, readFile, cachePartitions,
        requiredSchema, metadataColumns)
    } else {
      logInfo(s"Create FileScanRDD")
      new FileScanRDD(fsRelation.sparkSession, readFile, partitions,
        requiredSchema, metadataColumns)
    }
  }