in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala [281:345]
private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean = {
val dimension = rankingKey.eq.dimension
val bucketKey = makeBucketKey(rankingKey)
val labelName = "s2counter_topK_bucket"
val prepared = prepareCache.withCache(s"$dimension:$bucketKey") {
val checkReqJs = Json.arr(
Json.obj(
"label" -> labelName,
"direction" -> "out",
"from" -> dimension,
"to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey)
)
)
val future = wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
val checkRespJs = resp.json
if (checkRespJs.as[Seq[JsValue]].nonEmpty) {
true
} else {
false
}
case _ =>
// throw exception
throw new Exception(s"failed checkEdges. ${resp.body} ${resp.status}")
}
}.flatMap {
case true => Future.successful(Some(true))
case false =>
val insertReqJsLs = {
for {
i <- 0 until BUCKET_SHARD_COUNT
} yield {
Json.obj(
"timestamp" -> rankingKey.eq.tq.ts,
"from" -> dimension,
"to" -> makeBucketShardKey(i, rankingKey),
"label" -> labelName,
"props" -> Json.obj(
"time_unit" -> rankingKey.eq.tq.q.toString,
"date_time" -> rankingKey.eq.tq.dateTime
)
)
}
}
wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
Some(true)
case _ =>
// throw exception
throw new Exception(s"failed insertEdges. ${resp.body} ${resp.status}")
}
}
}.recover {
case e: Exception =>
None
}
Await.result(future, 10 second)
}
prepared.getOrElse(false)
}