private def distributeColumnarSplits()

in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala [270:433]


  private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = {
    // this function distributes the split based on following logic:
    // 1. based on data locality, to make split balanced on all available nodes
    // 2. if the number of split for one

    var statistic = new QueryStatistic()
    val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
    var parallelism = sparkContext.defaultParallelism
    val result = new ArrayList[Partition](parallelism)
    var noOfBlocks = 0
    var noOfNodes = 0
    var noOfTasks = 0

    if (!splits.isEmpty) {

      statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
      statistic = new QueryStatistic()
      // When the table has column drift, it means different blocks maybe have different schemas.
      // the query doesn't support to scan the blocks with different schemas in a task.
      // So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and
      // CARBON_TASK_DISTRIBUTION_CUSTOM can't work.
      val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) {
        CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
      } else {
        CarbonProperties.getInstance().getProperty(
          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
          CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
      }
      // If bucketing is enabled on table then partitions should be grouped based on buckets.
      val bucketInfo = tableInfo.getFactTable.getBucketingInfo
      if (bucketInfo != null) {
        var i = 0
        val bucketed =
          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
        (0 until bucketInfo.getNumOfRanges).map { bucketId =>
          val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
          val multiBlockSplit =
            new CarbonMultiBlockSplit(
              bucketPartitions.asJava,
              bucketPartitions.flatMap(_.getLocations).toArray)
          val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
          i += 1
          result.add(partition)
        }
      } else {
        val useCustomDistribution =
          CarbonProperties.getInstance().getProperty(
            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
            "false").toBoolean ||
          carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
        if (useCustomDistribution && !tableInfo.hasColumnDrift) {
          // create a list of block based on split
          val blockList = splits.asScala.map(_.asInstanceOf[Distributable])

          // get the list of executors and map blocks to executors based on locality
          val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)

          // divide the blocks among the tasks of the nodes as per the data locality
          val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
            parallelism, activeNodes.toList.asJava)
          var i = 0
          // Create Spark Partition for each task and assign blocks
          nodeBlockMapping.asScala.foreach { case (node, blockList) =>
            blockList.asScala.foreach { blocksPerTask =>
              val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
              if (blocksPerTask.size() != 0) {
                val multiBlockSplit =
                  new CarbonMultiBlockSplit(splits.asJava, Array(node))
                val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
                result.add(partition)
                i += 1
              }
            }
          }
          noOfNodes = nodeBlockMapping.size
        } else if (carbonDistribution.equalsIgnoreCase(
            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)) {
          // Use blocklet distribution
          // Randomize the blocklets for better shuffling
          Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
            val multiBlockSplit =
              new CarbonMultiBlockSplit(
                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
                splitWithIndex._1.getLocations)
            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
            result.add(partition)
          }
        } else if (!tableInfo.hasColumnDrift && carbonDistribution.equalsIgnoreCase(
            CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {

          // sort blocks in reverse order of length
          val blockSplits = splits
            .asScala
            .map(_.asInstanceOf[CarbonInputSplit])
            .groupBy(f => f.getFilePath)
            .map { blockSplitEntry =>
              new CarbonMultiBlockSplit(
                blockSplitEntry._2.asJava,
                blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
            }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)

          val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
          val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
          val defaultParallelism = spark.sparkContext.defaultParallelism
          val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
          val bytesPerCore = totalBytes / defaultParallelism

          val maxSplitBytes = Math
            .min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
          LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
                      s"open cost is considered as scanning $openCostInBytes bytes.")

          val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
          var currentSize = 0L

          def closePartition(): Unit = {
            if (currentFiles.nonEmpty) {
              result.add(combineSplits(currentFiles, currentSize, result.size()))
            }
            currentFiles.clear()
            currentSize = 0
          }

          blockSplits.foreach { file =>
            if (currentSize + file.getLength > maxSplitBytes) {
              closePartition()
            }
            // Add the given file to the current partition.
            currentSize += file.getLength + openCostInBytes
            currentFiles += file
          }
          closePartition()
        } else {
          // Use block distribution
          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).zipWithIndex.foreach {
            splitWithIndex =>
            val multiBlockSplit =
              new CarbonMultiBlockSplit(
                Seq(splitWithIndex._1).asJava,
                null)
            val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
            result.add(partition)
          }
        }
      }

      noOfBlocks = splits.size
      noOfTasks = result.size()

      statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
        System.currentTimeMillis)
      statisticRecorder.recordStatisticsForDriver(statistic, queryId)
      statisticRecorder.logStatisticsAsTableDriver()
    }
    logInfo(
      s"""
         | Identified no.of.blocks: $noOfBlocks,
         | no.of.tasks: $noOfTasks,
         | no.of.nodes: $noOfNodes,
         | parallelism: $parallelism
       """.stripMargin)
    result.asScala
  }