def dataBySearch()

in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala [102:146]


  def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
    val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
    info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")

    val dfOpt =
      try {
        val answer = httpGet(path)
        val data = ArrayBuffer[Map[String, Number]]()

        if (answer._1) {
          val arrayAnswers: JavaIterator[JsonNode] =
            parseString(answer._2).get("hits").get("hits").elements()

          while (arrayAnswers.hasNext) {
            val answer = arrayAnswers.next()
            val values = answer.get("_source").get("value")
            val fields: JavaIterator[JavaMap.Entry[String, JsonNode]] = values.fields()
            val fieldsMap = mutable.Map[String, Number]()
            while (fields.hasNext) {
              val fld: JavaMap.Entry[String, JsonNode] = fields.next()
              fieldsMap.put(fld.getKey, fld.getValue.numberValue())
            }
            data += fieldsMap.toMap
          }
        }
        val rdd1: RDD[Map[String, Number]] = sparkSession.sparkContext.parallelize(data)
        val columns: Array[String] = fields.toArray
        val defaultNumber: Number = 0.0
        val rdd: RDD[Row] = rdd1
          .map { x: Map[String, Number] =>
            Row(columns.map(c => x.getOrElse(c, defaultNumber).doubleValue()): _*)
          }
        val schema = dfSchema(columns.toList)
        val df: DataFrame = sparkSession.createDataFrame(rdd, schema).limit(size)
        val dfOpt = Some(df)
        val preDfOpt = preProcess(dfOpt, ms)
        preDfOpt
      } catch {
        case e: Throwable =>
          error(s"load ES table $host:$port $index/$dataType  fails: ${e.getMessage}", e)
          None
      }
    val tmsts = readTmst(ms)
    (dfOpt, TimeRange(ms, tmsts))
  }