in integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala [270:433]
private def distributeColumnarSplits(splits: List[InputSplit]): mutable.Buffer[Partition] = {
// this function distributes the split based on following logic:
// 1. based on data locality, to make split balanced on all available nodes
// 2. if the number of split for one
var statistic = new QueryStatistic()
val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder()
var parallelism = sparkContext.defaultParallelism
val result = new ArrayList[Partition](parallelism)
var noOfBlocks = 0
var noOfNodes = 0
var noOfTasks = 0
if (!splits.isEmpty) {
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statistic = new QueryStatistic()
// When the table has column drift, it means different blocks maybe have different schemas.
// the query doesn't support to scan the blocks with different schemas in a task.
// So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and
// CARBON_TASK_DISTRIBUTION_CUSTOM can't work.
val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) {
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
} else {
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
}
// If bucketing is enabled on table then partitions should be grouped based on buckets.
val bucketInfo = tableInfo.getFactTable.getBucketingInfo
if (bucketInfo != null) {
var i = 0
val bucketed =
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId)
(0 until bucketInfo.getNumOfRanges).map { bucketId =>
val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
val multiBlockSplit =
new CarbonMultiBlockSplit(
bucketPartitions.asJava,
bucketPartitions.flatMap(_.getLocations).toArray)
val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
i += 1
result.add(partition)
}
} else {
val useCustomDistribution =
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
"false").toBoolean ||
carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM)
if (useCustomDistribution && !tableInfo.hasColumnDrift) {
// create a list of block based on split
val blockList = splits.asScala.map(_.asInstanceOf[Distributable])
// get the list of executors and map blocks to executors based on locality
val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext)
// divide the blocks among the tasks of the nodes as per the data locality
val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1,
parallelism, activeNodes.toList.asJava)
var i = 0
// Create Spark Partition for each task and assign blocks
nodeBlockMapping.asScala.foreach { case (node, blockList) =>
blockList.asScala.foreach { blocksPerTask =>
val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
if (blocksPerTask.size() != 0) {
val multiBlockSplit =
new CarbonMultiBlockSplit(splits.asJava, Array(node))
val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
result.add(partition)
i += 1
}
}
}
noOfNodes = nodeBlockMapping.size
} else if (carbonDistribution.equalsIgnoreCase(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET)) {
// Use blocklet distribution
// Randomize the blocklets for better shuffling
Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex =>
val multiBlockSplit =
new CarbonMultiBlockSplit(
Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
splitWithIndex._1.getLocations)
val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
result.add(partition)
}
} else if (!tableInfo.hasColumnDrift && carbonDistribution.equalsIgnoreCase(
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) {
// sort blocks in reverse order of length
val blockSplits = splits
.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.groupBy(f => f.getFilePath)
.map { blockSplitEntry =>
new CarbonMultiBlockSplit(
blockSplitEntry._2.asJava,
blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray)
}.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
val defaultParallelism = spark.sparkContext.defaultParallelism
val totalBytes = blockSplits.map(_.getLength + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
val maxSplitBytes = Math
.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
s"open cost is considered as scanning $openCostInBytes bytes.")
val currentFiles = new ArrayBuffer[CarbonMultiBlockSplit]
var currentSize = 0L
def closePartition(): Unit = {
if (currentFiles.nonEmpty) {
result.add(combineSplits(currentFiles, currentSize, result.size()))
}
currentFiles.clear()
currentSize = 0
}
blockSplits.foreach { file =>
if (currentSize + file.getLength > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.getLength + openCostInBytes
currentFiles += file
}
closePartition()
} else {
// Use block distribution
splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).zipWithIndex.foreach {
splitWithIndex =>
val multiBlockSplit =
new CarbonMultiBlockSplit(
Seq(splitWithIndex._1).asJava,
null)
val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit)
result.add(partition)
}
}
}
noOfBlocks = splits.size
noOfTasks = result.size()
statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION,
System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statisticRecorder.logStatisticsAsTableDriver()
}
logInfo(
s"""
| Identified no.of.blocks: $noOfBlocks,
| no.of.tasks: $noOfTasks,
| no.of.nodes: $noOfNodes,
| parallelism: $parallelism
""".stripMargin)
result.asScala
}