in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala [102:146]
def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
val dfOpt =
try {
val answer = httpGet(path)
val data = ArrayBuffer[Map[String, Number]]()
if (answer._1) {
val arrayAnswers: JavaIterator[JsonNode] =
parseString(answer._2).get("hits").get("hits").elements()
while (arrayAnswers.hasNext) {
val answer = arrayAnswers.next()
val values = answer.get("_source").get("value")
val fields: JavaIterator[JavaMap.Entry[String, JsonNode]] = values.fields()
val fieldsMap = mutable.Map[String, Number]()
while (fields.hasNext) {
val fld: JavaMap.Entry[String, JsonNode] = fields.next()
fieldsMap.put(fld.getKey, fld.getValue.numberValue())
}
data += fieldsMap.toMap
}
}
val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
val columns: Array[String] = fields.toArray
val defaultNumber: Number = 0.0
val rdd: RDD[Row] = rdd1
.map { x: Map[String, Number] =>
Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
}
val schema = dfSchema(columns.toList)
val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
val dfOpt = Some(df)
val preDfOpt = preProcess(dfOpt, ms)
preDfOpt
} catch {
case e: Throwable =>
error(s"load ES table $host:$port $index/$dataType fails: ${e.getMessage}", e)
None
}
val tmsts = readTmst(ms)
(dfOpt, TimeRange(ms, tmsts))
}