in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala [56:113]
override def get(policy: Counter,
items: Seq[String],
timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
(implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = {
lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange"
val keys = {
for {
item <- items
} yield {
ExactKey(policy, item, checkItemType = true)
}
}
val gets = {
for {
key <- keys
} yield {
val get = new Get(BytesUtilV1.toBytes(key))
timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf =>
get.addFamily(cf.toString.getBytes)
}
get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, {
for {
(from, to) <- timeRange
} yield {
new ColumnRangeFilter(
BytesUtilV1.toBytes(from), true,
BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
}
}))
}
}
// println(s"$messageForLog $gets")
Future {
withHBase(getTableName(policy)) { table =>
for {
(rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty
} yield {
val qualifierWithCounts = {
for {
cell <- rst.listCells()
eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
} yield {
eq -> Bytes.toLong(CellUtil.cloneValue(cell))
}
}.toMap
FetchedCounts(key, qualifierWithCounts)
}
} match {
case Success(rst) => rst
case Failure(ex) =>
log.error(s"$ex: $messageForLog")
Nil
}
}
}