in connector/src/main/scala/com/datastax/spark/connector/datasource/ScanHelper.scala [178:213]
def getPartitionGenerator(
connector: CassandraConnector,
tableDef: TableDef,
whereClause: CqlWhereClause,
minSplitCount: Int,
partitionCount: Option[Int],
splitSize: Long): CassandraPartitionGenerator[V, T] = {
implicit val tokenFactory = TokenFactory.forSystemLocalPartitioner(connector)
if (containsPartitionKey(tableDef, whereClause)) {
CassandraPartitionGenerator(connector, tableDef, 1)
} else {
partitionCount match {
case Some(splitCount) => {
CassandraPartitionGenerator(connector, tableDef, splitCount)
}
case None => {
val estimateDataSize = new DataSizeEstimates(connector, tableDef.keyspaceName, tableDef.tableName).dataSizeInBytes
val splitCount = if (estimateDataSize == Long.MaxValue || estimateDataSize < 0) {
logWarning(
s"""Size Estimates has overflowed and calculated that the data size is Infinite.
|Falling back to $minSplitCount (2 * SparkCores + 1) Split Count.
|This is most likely occurring because you are reading size_estimates
|from a DataCenter which has very small primary ranges. Explicitly set
|the splitCount when reading to manually adjust this.""".stripMargin)
minSplitCount
} else {
val splitCountEstimate = estimateDataSize / splitSize
Math.max(splitCountEstimate.toInt, Math.max(minSplitCount, 1))
}
CassandraPartitionGenerator(connector, tableDef, splitCount)
}
}
}
}