in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala [53:85]
override def data(ms: Long): (Option[DataFrame], TimeRange) = {
val dfOpt =
try {
sparkSession.conf.set("spark.cassandra.connection.host", host)
sparkSession.conf.set("spark.cassandra.connection.port", port)
sparkSession.conf.set("spark.cassandra.auth.username", user)
sparkSession.conf.set("spark.cassandra.auth.password", password)
val tableDef: DataFrameReader = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> tableName, "keyspace" -> database))
val dataWh: String = dataWhere()
var data: DataFrame = null
if (wheres.length > 0) {
data = tableDef.load().where(dataWh)
} else {
data = tableDef.load()
}
val dfOpt = Some(data)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
case e: Throwable =>
error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}