in s2counter_loader/src/main/scala/org/apache/s2graph/counter/loader/core/DimensionProps.scala [63:109]
private[counter] def query(bucket: Bucket, item: CounterEtlItem): Future[Option[JsValue]] = {
val keyValues = (item.dimension.as[JsObject] ++ item.property.as[JsObject] fields)
.filter { case (key, _) => key.startsWith("[[") && key.endsWith("]]") }
.map { case (key, jsValue) =>
val replacement = jsValue match {
case JsString(s) => s
case value => value.toString()
}
key -> replacement
}.toList
val cacheKey = s"${bucket.impressionId}=" + keyValues.flatMap(x => Seq(x._1, x._2)).mkString("_")
cache.withCacheAsync(cacheKey) {
val retryFuture = RetryAsync(retryCnt, withSleep = false) {
val future = bucket.httpVerb.toUpperCase match {
case "GET" =>
client.url(bucket.apiPath).get()
case "POST" =>
val newBody = makeRequestBody(bucket.requestBody, keyValues)
client.url(bucket.apiPath).post(Json.parse(newBody))
}
future.map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
val json = Json.parse(resp.body)
for {
results <- (json \ "results").asOpt[Seq[JsValue]]
result <- results.headOption
props <- (result \ "props").asOpt[JsValue]
} yield {
props
}
case _ =>
log.error(s"${resp.body}(${resp.status}}) item: $item")
None
}
}
}
// if fail to retry
retryFuture onFailure { case t => log.error(s"${t.getMessage} item: $item") }
retryFuture
}
}