in s2rest_play/app/org/apache/s2graph/rest/play/controllers/CounterController.scala [608:692]
def getSumRankCounterResultAsync(policy: Counter,
dimKeys: Seq[(Map[String, String], Seq[RankingKey])],
kValue: Int,
qsSum: Option[String]): Future[RankCounterResult] = {
val futures = {
for {
(dimension, keys) <- dimKeys
} yield {
val tqs = keys.map(rk => rk.eq.tq)
val (tqFrom, tqTo) = (tqs.last, tqs.head)
val items = rankingCounter(policy.version).getAllItems(keys, kValue)
// Logger.warn(s"item count: ${items.length}")
val future = {
if (policy.isRateCounter) {
val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get
val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
seq.map { case (k, score) =>
ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
}.toMap
}
val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
seq.map { case (k, score) =>
ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
}.toMap
}
futureAction.zip(futureBase).map { case (actionScores, baseScores) =>
reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
// Logger.warn(s"$k -> $rrv")
k -> rrv.rankingValue.score
}.toSeq
}
}
else if (policy.isTrendCounter) {
val actionPolicy = policy.rateActionId.flatMap(counterModel.findById(_)).get
val basePolicy = policy.rateBaseId.flatMap(counterModel.findById(_)).get
val futureAction = getDecayedCountsAsync(actionPolicy, items, (tqFrom, tqTo), dimension, qsSum).map { seq =>
seq.map { case (k, score) =>
ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(score, -1)
}.toMap
}
val futureBase = getDecayedCountsAsync(basePolicy, items, (tqFrom.add(-1), tqTo.add(-1)), dimension, qsSum).map { seq =>
seq.map { case (k, score) =>
ExactKey(policy, k.itemKey, checkItemType = false) -> RateRankingValue(-1, score)
}.toMap
}
futureAction.zip(futureBase).map { case (actionScores, baseScores) =>
reduceRateRankingValue(actionScores, baseScores).map { case (k, rrv) =>
// Logger.warn(s"$k -> $rrv")
k -> rrv.rankingValue.score
}.toSeq
}
}
else {
getDecayedCountsAsync(policy, items, (tqFrom, tqTo), dimension, qsSum)
}
}
future.map { keyWithScore =>
val ranking = keyWithScore.sortBy(-_._2).take(kValue)
val rankCounterItems = {
for {
idx <- ranking.indices
(exactKey, score) = ranking(idx)
} yield {
val realId = policy.itemType match {
case ItemType.BLOB => exactCounter(policy.version).getBlobValue(policy, exactKey.itemKey)
.getOrElse(throw new Exception(s"not found blob id. ${policy.service}.${policy.action} ${exactKey.itemKey}"))
case _ => exactKey.itemKey
}
RankCounterItem(idx + 1, realId, score)
}
}
val eq = ExactQualifier(tqFrom, dimension)
RankCounterDimensionItem(eq.tq.q.toString, eq.tq.ts, eq.dimension, -1, rankCounterItems)
}
}
}
Future.sequence(futures).map { dimensionResultList =>
RankCounterResult(RankCounterResultMeta(policy.service, policy.action), dimensionResultList)
}
}