override def get()

in s2counter_core/src/main/scala/org/apache/s2graph/counter/core/v1/ExactStorageHBase.scala [56:113]


  override def get(policy: Counter,
                   items: Seq[String],
                   timeRange: Seq[(core.TimedQualifier, core.TimedQualifier)])
                  (implicit ec: ExecutionContext): Future[Seq[FetchedCounts]] = {
    lazy val messageForLog = s"${policy.service}.${policy.action} $items $timeRange"

    val keys = {
      for {
        item <- items
      } yield {
        ExactKey(policy, item, checkItemType = true)
      }
    }

    val gets = {
      for {
        key <- keys
      } yield {
        val get = new Get(BytesUtilV1.toBytes(key))
        timeRange.map(t => intervalsMap(t._1.q)).distinct.foreach { cf =>
          get.addFamily(cf.toString.getBytes)
        }
        get.setFilter(new FilterList(FilterList.Operator.MUST_PASS_ONE, {
          for {
            (from, to) <- timeRange
          } yield {
            new ColumnRangeFilter(
              BytesUtilV1.toBytes(from), true,
              BytesUtilV1.toBytes(to.copy(ts = to.ts + 1)), false)
          }
        }))
      }
    }

    //    println(s"$messageForLog $gets")
    Future {
      withHBase(getTableName(policy)) { table =>
        for {
          (rst, key) <- table.get(gets).zip(keys) if !rst.isEmpty
        } yield {
          val qualifierWithCounts = {
            for {
              cell <- rst.listCells()
              eq = BytesUtilV1.toExactQualifier(CellUtil.cloneQualifier(cell))
            } yield {
              eq -> Bytes.toLong(CellUtil.cloneValue(cell))
            }
          }.toMap
          FetchedCounts(key, qualifierWithCounts)
        }
      } match {
        case Success(rst) => rst
        case Failure(ex) =>
          log.error(s"$ex: $messageForLog")
          Nil
      }
    }
  }