override def planInputPartitions()

in GlueCustomConnectors/development/Spark/glue-3.0/tpcds-custom-connector-for-glue3.0/src/main/scala/com/amazonaws/services/glue/marketplace/connector/tpcds/TPCDSScan.scala [66:108]


  override def planInputPartitions(): Array[InputPartition] = {
    logInfo(s"The '${table.getName}' data will be generated.")
    val inPartitionRows = new ArrayBuffer[TPCDSInputPartition]()

    // Check if the generated data will be chunked.
    // In the specification of teradata/tpcds library,
    //  if the requested table has the number of rows which is less than a specific number (this number depends on the requested table),
    //  the generated data won't be chunked, has single chunk.
    //  If dataset is generated with single chunk, the generated data whose chunk-number 2 or more returns empty iterator.
    // Then, check if the generated data is chunked or not here. This checking will be used below.
    val isChunk: Boolean = TPCDSUtils.generateChunkIterator(table, scale, numPartitions, 2).hasNext

    // The partitioning logic here has the following THREE patterns:
    // 1. If the data is chunked, the dataset will be generated in parallel based on specified numPartitions.
    // 2. If the data is not chunked, the way of partitioning depends on which is bigger between numPartitions and rows in the generated data.
    //  - 2-1. If numPartitions > the number of rows, the dataset will be generated with single partition (the number of rows is around 10,000).
    //  - 2-2. If numPartitions <= the number of rows, the dataset will be generated with partitions whose number is based on specified numPartitions.
    if(isChunk) {
      val chunkNumbers = (1 to numPartitions).toList
      chunkNumbers.foreach(
        chunkNumber => inPartitionRows.append(
          new TPCDSInputPartition(scale, table, numPartitions, chunkNumber, schema))
      )
    } else {
      logInfo("The generated data has a single chunk.")
      val rowCount = TPCDSUtils.generateChunkIterator(table, scale, numPartitions, 1).length // the row count of the requested table dataset
      rowCount match {
        case l if l < numPartitions => // Check if the specified numPartitions is bigger than the row count in the dataset of the requested table
          logInfo(s"The '$table' data will be generated with one partition because numPartitions is bigger than the row number of generated data.")
          inPartitionRows.append(
            new TPCDSInputPartition(scale, table, numPartitions, 1, schema))
        case _ => // Split single partition data into chunks based on specified numPartitions
          val rowDelta = Math.floor(rowCount / numPartitions).toInt
          for(partition <- 0 until numPartitions) {
            val until = if(partition == numPartitions - 1) rowCount else (partition + 1) * rowDelta
            inPartitionRows.append(
              new TPCDSSingleChunkInputPartition(
                  scale, table, numPartitions, 1, partition * rowDelta, until, schema))
          }
      }
    }
    inPartitionRows.toArray
  }