in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala [222:272]
private def getEdges(key: RankingKey, duplicate: String="first"): Future[List[JsValue]] = {
val labelName = counterModel.findById(key.policyId).get.action + labelPostfix
val ids = {
(0 until BUCKET_SHARD_COUNT).map { shardIdx =>
s""""${makeBucketShardKey(shardIdx, key)}""""
}
}.mkString(",")
val strJs =
s"""
|{
| "srcVertices": [
| {
| "serviceName": "$SERVICE_NAME",
| "columnName": "$BUCKET_COLUMN_NAME",
| "ids": [$ids]
| }
| ],
| "steps": [
| {
| "step": [
| {
| "label": "$labelName",
| "duplicate": "$duplicate",
| "direction": "out",
| "offset": 0,
| "limit": -1,
| "interval": {
| "from": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}},
| "to": {"time_unit": "${key.eq.tq.q.toString}", "time_value": ${key.eq.tq.ts}}
| },
| "scoring": {"score": 1}
| }
| ]
| }
| ]
|}
""".stripMargin
log.debug(strJs)
val payload = Json.parse(strJs)
wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(payload).map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
(resp.json \ "results").asOpt[List[JsValue]].getOrElse(Nil)
case _ =>
throw new RuntimeException(s"failed getEdges. errCode: ${resp.status}, body: ${resp.body}, query: $payload")
}
}
}