in connector/src/main/scala/com/datastax/spark/connector/datasource/JoinHelper.scala [53:91]
def getJoinQueryString(
tableDef: TableDef,
joinColumns: Seq[ColumnRef],
queryParts: CqlQueryParts) = {
val whereClauses = queryParts.whereClause.predicates.flatMap(CqlWhereParser.parse)
val joinColumnNames = joinColumns.map(_.columnName)
val joinColumnPredicates = whereClauses.collect {
case EqPredicate(c, _) if joinColumnNames.contains(c) => c
case InPredicate(c) if joinColumnNames.contains(c) => c
case InListPredicate(c, _) if joinColumnNames.contains(c) => c
case RangePredicate(c, _, _) if joinColumnNames.contains(c) => c
}.toSet
require(
joinColumnPredicates.isEmpty,
s"""Columns specified in both the join on clause and the where clause.
|Partition key columns are always part of the join clause.
|Columns in both: ${joinColumnPredicates.mkString(", ")}""".stripMargin
)
logDebug("Generating Single Key Query Prepared Statement String")
logDebug(s"SelectedColumns : ${queryParts.selectedColumnRefs} -- JoinColumnNames : $joinColumnNames")
val columns = queryParts.selectedColumnRefs.map(_.cql).mkString(", ")
val joinWhere = joinColumnNames.map(name => s"${CqlIdentifier.fromInternal(name).asCql(true)} = :$name")
val limitClause = CassandraLimit.limitToClause(queryParts.limitClause)
val orderBy = queryParts.clusteringOrder.map(_.toCql(tableDef)).getOrElse("")
val filter = (queryParts.whereClause.predicates ++ joinWhere).mkString(" AND ")
val quotedKeyspaceName = CqlIdentifier.fromInternal(tableDef.keyspaceName).asCql(true)
val quotedTableName = CqlIdentifier.fromInternal(tableDef.tableName).asCql(true)
val query =
s"SELECT $columns " +
s"FROM $quotedKeyspaceName.$quotedTableName " +
s"WHERE $filter $orderBy $limitClause"
logDebug(s"Query : $query")
query
}