override def readSchema()

in spark/common/src/main/scala/org/apache/sedona/sql/datasources/spider/SpiderScanBuilder.scala [43:132]


    override def readSchema(): StructType = SpiderTable.SCHEMA

    override def toBatch: Batch = new Batch {
      override def planInputPartitions(): Array[InputPartition] = {
        if (distribution != "parcel") {
          val finalNumPartitions = Math.min(numPartitions, numRows).toInt
          val partitionInfo = computePartitionRanges(finalNumPartitions, numRows)
          val optsMap = opts.asCaseSensitiveMap()
          partitionInfo.zipWithIndex.map { case ((startIndex, partitionRows), iPartition) =>
            SpiderPartition(
              iPartition,
              startIndex,
              partitionRows,
              seed + iPartition,
              distribution,
              transform,
              optsMap)
          }.toArray
        } else {
          // Parcel distribution requires special partition generation to ensure that the records are
          // non overlapping.
          val maxPartitionBoxes = Math.min(numPartitions, numRows)
          if (maxPartitionBoxes == 0) {
            return Array.empty[InputPartition]
          }

          // The actual number of partitions is a power of 4 that is less than or equal to the number of
          // requested partitions. This is for distributing the records evenly across the partitions.
          val finalNumPartitions =
            Math.pow(4, Math.floor(Math.log(maxPartitionBoxes) / Math.log(4))).toInt

          // Generate the partitions using the parcel generator but set dithering to zero since dithering
          // should only be applied on the final records and not the partitions
          val conf = new util.HashMap[String, String]()
          conf.putAll(opts.asCaseSensitiveMap())
          conf.put("dither", "0")
          conf.put("cardinality", finalNumPartitions.toString)
          val random = new Random(seed - 1)
          val parcelGenerator =
            GeneratorFactory.create("parcel", random, conf).asInstanceOf[ParcelGenerator]

          // Generate the partitions
          val partitionInfo = computePartitionRanges(finalNumPartitions, numRows)
          partitionInfo.zipWithIndex.map { case ((startIndex, partitionRows), iPartition) =>
            val partitionBox = parcelGenerator.generateBox()
            val partitionTransform = transform.transform(partitionBox)
            val partitionOpts = new util.HashMap[String, String]()
            partitionOpts.putAll(opts.asCaseSensitiveMap())
            partitionOpts.put("cardinality", partitionRows.toString)
            SpiderPartition(
              iPartition,
              startIndex,
              partitionRows,
              seed + iPartition,
              distribution,
              partitionTransform,
              partitionOpts)
          }.toArray
        }
      }

      /**
       * Computes the start index and number of rows for each partition
       *
       * @param numPartitions
       *   The number of partitions to create
       * @param totalRows
       *   The total number of rows to distribute
       * @return
       *   Sequence of (startIndex, numRows) for each partition
       */
      private def computePartitionRanges(
          numPartitions: Int,
          totalRows: Long): Seq[(Long, Long)] = {
        var recordsRemaining = totalRows
        (0 until numPartitions).map { iPartition =>
          val startIndex = totalRows - recordsRemaining
          val partitionRows = recordsRemaining / (numPartitions - iPartition)
          recordsRemaining -= partitionRows
          (startIndex, partitionRows)
        }
      }

      override def createReaderFactory(): PartitionReaderFactory = {
        (partition: InputPartition) =>
          {
            new SpiderPartitionReader(partition.asInstanceOf[SpiderPartition])
          }
      }
    }