override def data()

in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/CassandraDataConnector.scala [53:85]


  override def data(ms: Long): (Option[DataFrame], TimeRange) = {

    val dfOpt =
      try {
        sparkSession.conf.set("spark.cassandra.connection.host", host)
        sparkSession.conf.set("spark.cassandra.connection.port", port)
        sparkSession.conf.set("spark.cassandra.auth.username", user)
        sparkSession.conf.set("spark.cassandra.auth.password", password)

        val tableDef: DataFrameReader = sparkSession.read
          .format("org.apache.spark.sql.cassandra")
          .options(Map("table" -> tableName, "keyspace" -> database))

        val dataWh: String = dataWhere()

        var data: DataFrame = null
        if (wheres.length > 0) {
          data = tableDef.load().where(dataWh)
        } else {
          data = tableDef.load()
        }

        val dfOpt = Some(data)
        val preDfOpt = preProcess(dfOpt, ms)
        preDfOpt
      } catch {
        case e: Throwable =>
          error(s"load cassandra table $database.$TableName fails: ${e.getMessage}", e)
          None
      }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }