private[counter] def query()

in s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala [63:109]


  private[counter] def query(bucket: Bucket, item: CounterEtlItem): Future[Option[JsValue]] = {
    val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields)
      .filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }
      .map { case (key, jsValue) =>
        val replacement = jsValue match {
          case JsString(s) => s
          case value => value.toString()
        }
        key -> replacement
      }.toList

    val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_")

    cache.withCacheAsync(cacheKey) {
      val retryFuture = RetryAsync(retryCnt, withSleep = false) {
        val future = bucket.httpVerb.toUpperCase match {
          case "GET" =>
            client.url(bucket.apiPath).get()
          case "POST" =>
            val newBody = makeRequestBody(bucket.requestBody, keyValues)
            client.url(bucket.apiPath).post(Json.parse(newBody))
        }

        future.map { resp =>
          resp.status match {
            case HttpStatus.SC_OK =>
              val json = Json.parse(resp.body)
              for {
                results <- (json \ "results").asOpt[Seq[JsValue]]
                result <- results.headOption
                props <- (result \ "props").asOpt[JsValue]
              } yield {
                props
              }
            case _ =>
              log.error(s"${resp.body}(${resp.status}}) item: $item")
              None
          }
        }
      }

      // if fail to retry
      retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") }

      retryFuture
    }
  }