in connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala [145:182]
private[connector] def withPartitioner[K, V, T <: ConnectorToken[V]](
partitioner: Option[Partitioner]): CassandraTableScanRDD[R] = {
val cassPart = partitioner match {
case Some(newPartitioner: CassandraPartitioner[_, _, _]) => {
this.partitioner match {
case Some(currentPartitioner: CassandraPartitioner[_, _, _]) =>
/** Preserve the mapping set by the current partitioner **/
logDebug(
s"""Preserving Partitioner: $currentPartitioner with mapping
|${currentPartitioner.keyMapping}""".stripMargin)
Some(
newPartitioner
.withTableDef(tableDef)
.withKeyMapping(currentPartitioner.keyMapping))
case _ =>
logDebug(s"Assigning new Partitioner $newPartitioner")
Some(newPartitioner.withTableDef(tableDef))
}
}
case Some(other: Partitioner) => throw new IllegalArgumentException(
s"""Unable to assign
|non-CassandraPartitioner $other to CassandraTableScanRDD """.stripMargin)
case None => None
}
new CassandraTableScanRDD[R](
sc = sc,
connector = connector,
keyspaceName = keyspaceName,
tableName = tableName,
columnNames = columnNames,
where = where,
limit = limit,
clusteringOrder = clusteringOrder,
readConf = readConf,
overridePartitioner = cassPart)
}