override def get()

in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/ExactStorageGraph.scala [119:213]


  override def get(policy: Counter,
                   items: Seq[String],
                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)],
                   dimQuery: Map[String, Set[String]])
                  (implicit ex: ExecutionContext): Future[Seq[FetchedCountsGrouped]] = {
    val labelName = policy.action + labelPostfix
    val label = Label.findByName(labelName).get
//    val label = labelModel.findByName(labelName).get

    val ids = Json.toJson(items)

    val dimensions = {
      for {
        values <- CartesianProduct(dimQuery.values.map(ss => ss.toList).toList)
      } yield {
        dimQuery.keys.zip(values).toMap
      }
    }

    val stepJsLs = {
      for {
        (tqFrom, tqTo) <- timeRange
        dimension <- dimensions
      } yield {
        val eqFrom = core.ExactQualifier(tqFrom, dimension)
        val eqTo = core.ExactQualifier(tqTo, dimension)
        val intervalJs =
          s"""
            |{
            |  "from": {
            |    "_to": "${eqFrom.dimension}",
            |    "time_unit": "${eqFrom.tq.q}",
            |    "time_value": ${eqFrom.tq.ts}
            |  },
            |  "to": {
            |    "_to": "${eqTo.dimension}",
            |    "time_unit": "${eqTo.tq.q}",
            |    "time_value": ${eqTo.tq.ts + 1}
            |  }
            |}
          """.stripMargin
        val stepJs =
          s"""
            |{
            |  "direction": "out",
            |  "limit": -1,
            |  "duplicate": "raw",
            |  "label": "$labelName",
            |  "interval": $intervalJs
            |}
           """.stripMargin
        stepJs
      }
    }

    val reqJsStr =
      s"""
        |{
        |  "srcVertices": [
        |    {"serviceName": "${policy.service}", "columnName": "${label.srcColumnName}", "ids": $ids}
        |  ],
        |  "steps": [
        |    {
        |      "step": [
        |        ${stepJsLs.mkString(",")}
        |      ]
        |    }
        |  ]
        |}
      """.stripMargin

    val reqJs = Json.parse(reqJsStr)
//    log.warn(s"query: ${reqJs.toString()}")

    wsClient.url(s"$s2graphReadOnlyUrl/graphs/getEdges").post(reqJs).map { resp =>
      resp.status match {
        case HttpStatus.SC_OK =>
          val respJs = resp.json
//          println(respJs)
          val keyWithValues = (respJs \ "results").as[Seq[JsValue]].map { result =>
//            println(s"result: $result")
            resultToExactKeyValues(policy, result)
          }.groupBy(_._1).mapValues(seq => seq.map(_._2).toMap.groupBy { case (eq, v) => (eq.tq.q, eq.dimKeyValues) })
          for {
            (k, v) <- keyWithValues.toSeq
          } yield {
            FetchedCountsGrouped(k, v)
          }
        case n: Int =>
          log.warn(s"getEdges status($n): $reqJsStr")
//          println(s"getEdges status($n): $reqJsStr")
          Nil
      }
    }
  }