in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala [147:200]
override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
// increment mutation to hbase
val increments = {
for {
(exactKey, values) <- counts
inc = new Increment(BytesUtilV1.toBytes(exactKey))
} yield {
for {
(eq, value) <- values
} {
inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value)
}
// add column by dimension
inc
}
}
val results: Array[Object] = Array.fill(increments.size)(null)
withHBase(getTableName(policy)) { table =>
table.batch(increments, results)
} match {
case Failure(ex) =>
log.error(s"${ex.getMessage}")
case _ =>
}
assert(counts.length == results.length)
for {
((exactKey, eqWithValue), result) <- counts.zip(results)
} yield {
val eqWithResult = result match {
case r: Result =>
for {
(eq, value) <- eqWithValue
} yield {
val interval = eq.tq.q
val cf = intervalsMap(interval)
val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell =>
Bytes.toLong(CellUtil.cloneValue(cell))
}.getOrElse(-1l)
eq -> result
}
case ex: Throwable =>
log.error(s"${ex.getMessage}: $exactKey")
Nil
case _ =>
log.error(s"result is null: $exactKey")
Nil
}
(exactKey, eqWithResult.toMap)
}
}.toMap