def getJoinQueryString()

in connector/src/main/scala/com/datastax/spark/connector/datasource/JoinHelper.scala [53:91]


  def getJoinQueryString(
    tableDef: TableDef,
    joinColumns: Seq[ColumnRef],
    queryParts: CqlQueryParts) = {

    val whereClauses = queryParts.whereClause.predicates.flatMap(CqlWhereParser.parse)
    val joinColumnNames = joinColumns.map(_.columnName)

    val joinColumnPredicates = whereClauses.collect {
      case EqPredicate(c, _) if joinColumnNames.contains(c) => c
      case InPredicate(c) if joinColumnNames.contains(c) => c
      case InListPredicate(c, _) if joinColumnNames.contains(c) => c
      case RangePredicate(c, _, _) if joinColumnNames.contains(c) => c
    }.toSet

    require(
      joinColumnPredicates.isEmpty,
      s"""Columns specified in both the join on clause and the where clause.
         |Partition key columns are always part of the join clause.
         |Columns in both: ${joinColumnPredicates.mkString(", ")}""".stripMargin
    )


    logDebug("Generating Single Key Query Prepared Statement String")
    logDebug(s"SelectedColumns : ${queryParts.selectedColumnRefs} -- JoinColumnNames : $joinColumnNames")
    val columns = queryParts.selectedColumnRefs.map(_.cql).mkString(", ")
    val joinWhere = joinColumnNames.map(name => s"${CqlIdentifier.fromInternal(name).asCql(true)} = :$name")
    val limitClause = CassandraLimit.limitToClause(queryParts.limitClause)
    val orderBy = queryParts.clusteringOrder.map(_.toCql(tableDef)).getOrElse("")
    val filter = (queryParts.whereClause.predicates ++ joinWhere).mkString(" AND ")
    val quotedKeyspaceName = CqlIdentifier.fromInternal(tableDef.keyspaceName).asCql(true)
    val quotedTableName = CqlIdentifier.fromInternal(tableDef.tableName).asCql(true)
    val query =
      s"SELECT $columns " +
        s"FROM $quotedKeyspaceName.$quotedTableName " +
        s"WHERE $filter $orderBy $limitClause"
    logDebug(s"Query : $query")
    query
  }