in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala [56:128]
override def get(policy: Counter,
items: Seq[String],
timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
(implicit ex: ExecutionContext): Future[Seq[FetchedCounts]] = {
val tableName = getTableName(policy)
lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange"
val keys = {
for {
item <- items
} yield {
ExactKey(policy, item, checkItemType = true)
}
}
val gets = {
for {
cf <- timeRange.map(t => intervalsMap(t._1.q)).distinct
key <- keys
} yield {
val get = new GetRequest(tableName, BytesUtilV1.toBytes(key))
get.family(cf.toString)
get.setFilter(new FilterList({
for {
(from, to) <- timeRange
} yield {
new ColumnRangeFilter(
BytesUtilV1.toBytes(from), true,
BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
}
}, FilterList.Operator.MUST_PASS_ONE))
(key, cf, get)
}
}
// println(s"$messageForLog $gets")
withAsyncHBase[Seq[FetchedCounts]] { client =>
val deferreds: Seq[Deferred[FetchedCounts]] = {
for {
(key, cf, get) <- gets
} yield {
client.get(get).addCallback { new Callback[FetchedCounts, util.ArrayList[KeyValue]] {
override def call(kvs: util.ArrayList[KeyValue]): FetchedCounts = {
val qualifierWithCounts = {
for {
kv <- kvs
eq = BytesUtilV1.toExactQualifier(kv.qualifier())
} yield {
eq -> Bytes.toLong(kv.value())
}
}.toMap
// println(s"$key $qualifierWithCounts")
FetchedCounts(key, qualifierWithCounts)
}
}}
}
}
Deferred.group(deferreds).addCallback { new Callback[Seq[FetchedCounts], util.ArrayList[FetchedCounts]] {
override def call(arg: util.ArrayList[FetchedCounts]): Seq[FetchedCounts] = {
for {
(key, fetchedGroup) <- Seq(arg: _*).groupBy(_.exactKey)
} yield {
fetchedGroup.reduce[FetchedCounts] { case (f1, f2) =>
FetchedCounts(key, f1.qualifierWithCountMap ++ f2.qualifierWithCountMap)
}
}
}.toSeq
}}
}
}