override def get()

in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala [56:128]


  override def get(policy: Counter,
                   items: Seq[String],
                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
                  (implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = {

    val tableName = getTableName(policy)

    lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange"

    val keys = {
      for {
        item <- items
      } yield {
        ExactKey(policy, item, checkItemType = true)
      }
    }

    val gets = {
      for {
        cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct
        key <- keys
      } yield {
        val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
        get.family(cf.toString)
        get.setFilter(new FilterList({
          for {
            (from, to) <- timeRange
          } yield {
            new ColumnRangeFilter(
              BytesUtilV1.toBytes(from), true,
              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
          }
        }, FilterList.Operator.MUST_PASS_ONE))
        (key, cf, get)
      }
    }

//    println(s"$messageForLog $gets")

    withAsyncHBase[Seq[FetchedCounts]] { client =>
      val deferreds: Seq[Deferred[FetchedCounts]] = {
        for {
          (key, cf, get) <- gets
        } yield {
          client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] {
            override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
              val qualifierWithCounts = {
                for {
                  kv <- kvs
                  eq = BytesUtilV1.toExactQualifier(kv.qualifier())
                } yield {
                  eq -> Bytes.toLong(kv.value())
                }
              }.toMap
//              println(s"$key $qualifierWithCounts")
              FetchedCounts(key, qualifierWithCounts)
            }
          }}
        }
      }
      Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] {
        override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = {
          for {
            (key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey)
          } yield {
            fetchedGroup.reduce[FetchedCounts] { case (f1, f2) =>
              FetchedCounts(key, f1.qualifierWithCountMap ++ f2.qualifierWithCountMap)
            }
          }
        }.toSeq
      }}
    }
  }