in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala [119:213]
override def get(policy: Counter,
items: Seq[String],
timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
dimQuery: Map[String, Set[String]])
(implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = {
val labelName = policy.action + labelPostfix
val label = Label.findByName(labelName).get
// val label = labelModel.findByName(labelName).get
val ids = Json.toJson(items)
val dimensions = {
for {
values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList)
} yield {
dimQuery.keys.zip(values).toMap
}
}
val stepJsLs = {
for {
(tqFrom, tqTo) <- timeRange
dimension <- dimensions
} yield {
val eqFrom = core.ExactQualifier(tqFrom, dimension)
val eqTo = core.ExactQualifier(tqTo, dimension)
val intervalJs =
s"""
|{
| "from": {
| "_to": "${eqFrom.dimension}",
| "time_unit": "${eqFrom.tq.q}",
| "time_value": ${eqFrom.tq.ts}
| },
| "to": {
| "_to": "${eqTo.dimension}",
| "time_unit": "${eqTo.tq.q}",
| "time_value": ${eqTo.tq.ts + 1}
| }
|}
""".stripMargin
val stepJs =
s"""
|{
| "direction": "out",
| "limit": -1,
| "duplicate": "raw",
| "label": "$labelName",
| "interval": $intervalJs
|}
""".stripMargin
stepJs
}
}
val reqJsStr =
s"""
|{
| "srcVertices": [
| {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids}
| ],
| "steps": [
| {
| "step": [
| ${stepJsLs.mkString(",")}
| ]
| }
| ]
|}
""".stripMargin
val reqJs = Json.parse(reqJsStr)
// log.warn(s"query: ${reqJs.toString()}")
wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp =>
resp.status match {
case HttpStatus.SC_OK =>
val respJs = resp.json
// println(respJs)
val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result =>
// println(s"result: $result")
resultToExactKeyValues(policy, result)
}.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) })
for {
(k, v) <- keyWithValues.toSeq
} yield {
FetchedCountsGrouped(k, v)
}
case n: Int =>
log.warn(s"getEdges status($n): $reqJsStr")
// println(s"getEdges status($n): $reqJsStr")
Nil
}
}
}