private def checkAndPrepareDimensionBucket()

in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v2/RankingStorageGraph.scala [281:345]


  private def checkAndPrepareDimensionBucket(rankingKey: RankingKey): Boolean = {
    val dimension = rankingKey.eq.dimension
    val bucketKey = makeBucketKey(rankingKey)
    val labelName = "s2counter_topK_bucket"

    val prepared = prepareCache.withCache(s"$dimension:$bucketKey") {
      val checkReqJs = Json.arr(
        Json.obj(
          "label" -> labelName,
          "direction" -> "out",
          "from" -> dimension,
          "to" -> makeBucketShardKey(BUCKET_SHARD_COUNT - 1, rankingKey)
        )
      )

      val future = wsClient.url(s"$s2graphReadOnlyUrl/graphs/checkEdges").post(checkReqJs).map { resp =>
        resp.status match {
          case HttpStatus.SC_OK =>
            val checkRespJs = resp.json
            if (checkRespJs.as[Seq[JsValue]].nonEmpty) {
              true
            } else {
              false
            }
          case _ =>
            // throw exception
            throw new Exception(s"failed checkEdges. ${resp.body} ${resp.status}")
        }
      }.flatMap {
        case true => Future.successful(Some(true))
        case false =>
          val insertReqJsLs = {
            for {
              i <- 0 until BUCKET_SHARD_COUNT
            } yield {
              Json.obj(
                "timestamp" -> rankingKey.eq.tq.ts,
                "from" -> dimension,
                "to" -> makeBucketShardKey(i, rankingKey),
                "label" -> labelName,
                "props" -> Json.obj(
                  "time_unit" -> rankingKey.eq.tq.q.toString,
                  "date_time" -> rankingKey.eq.tq.dateTime
                )
              )
            }
          }
          wsClient.url(s"$s2graphUrl/graphs/edges/insert").post(Json.toJson(insertReqJsLs)).map { resp =>
            resp.status match {
              case HttpStatus.SC_OK =>
                Some(true)
              case _ =>
                // throw exception
                throw new Exception(s"failed insertEdges. ${resp.body} ${resp.status}")
            }
          }
      }.recover {
        case e: Exception =>
          None
      }

      Await.result(future, 10 second)
    }
    prepared.getOrElse(false)
  }