override def update()

in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageAsyncHBase.scala [147:200]


  override def update(policy: Counter, counts: Seq[(ExactKeyTrait, ExactValueMap)]): Map[ExactKeyTrait, ExactValueMap] = {
    // increment mutation to hbase
    val increments = {
      for {
        (exactKey, values) <- counts
        inc = new Increment(BytesUtilV1.toBytes(exactKey))
      } yield {
        for {
          (eq, value) <- values
        } {
          inc.addColumn(intervalsMap.apply(eq.tq.q).toString.getBytes, BytesUtilV1.toBytes(eq), value)
        }
        // add column by dimension
        inc
      }
    }

    val results: Array[Object] = Array.fill(increments.size)(null)

    withHBase(getTableName(policy)) { table =>
      table.batch(increments, results)
    } match {
      case Failure(ex) =>
        log.error(s"${ex.getMessage}")
      case _ =>
    }

    assert(counts.length == results.length)

    for {
      ((exactKey, eqWithValue), result) <- counts.zip(results)
    } yield {
      val eqWithResult = result match {
        case r: Result =>
          for {
            (eq, value) <- eqWithValue
          } yield {
            val interval = eq.tq.q
            val cf = intervalsMap(interval)
            val result = Option(r.getColumnLatestCell(cf.toString.getBytes, BytesUtilV1.toBytes(eq))).map { cell =>
              Bytes.toLong(CellUtil.cloneValue(cell))
            }.getOrElse(-1l)
            eq -> result
          }
        case ex: Throwable =>
          log.error(s"${ex.getMessage}: $exactKey")
          Nil
        case _ =>
          log.error(s"result is null: $exactKey")
          Nil
      }
      (exactKey, eqWithResult.toMap)
    }
  }.toMap