in s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala [214:301]
def merges(globalQueryOption: QueryOption,
multiStepResults: Seq[StepResult],
weights: Seq[Double] = Nil,
filterOutStepResult: StepResult): StepResult = {
val degrees = multiStepResults.flatMap(_.degreeEdges)
val ls = new mutable.ListBuffer[EdgeWithScore]()
val agg= new mutable.HashMap[GroupByKey, ListBuffer[EdgeWithScore]]()
val sums = new mutable.HashMap[GroupByKey, Double]()
val filterOutSet = filterOutStepResult.edgeWithScores.foldLeft(Set.empty[Seq[Option[Any]]]) { case (prev, t) =>
prev + t.filterOutValues
}
for {
(weight, eachStepResult) <- weights.zip(multiStepResults)
(ordered, grouped) = (eachStepResult.edgeWithScores, eachStepResult.grouped)
} {
ordered.foreach { t =>
val filterOutKey = t.filterOutValues
if (!filterOutSet.contains(filterOutKey)) {
val newScore = t.score * weight
val newT = t.copy(score = newScore)
// val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore)
val newOrderByValues =
if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None)
else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
ls += t.copy(score = newScore, orderByValues = newOrderByValues, groupByValues = newGroupByValues)
}
}
// process each query's stepResult's grouped
for {
(groupByKey, (scoreSum, values)) <- grouped
} {
val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[EdgeWithScore])
var scoreSum = 0.0
var isEmpty = true
values.foreach { t =>
val filterOutKey = t.filterOutValues
if (!filterOutSet.contains(filterOutKey)) {
isEmpty = false
val newScore = t.score * weight
val newT = t.copy(score = newScore)
// val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore)
val newOrderByValues =
if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None)
else toTuple4(newT.toValues(globalQueryOption.orderByKeys))
val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys)
buffer += t.copy(score = newScore, orderByValues = newOrderByValues, groupByValues = newGroupByValues)
scoreSum += newScore
}
}
if (!isEmpty) sums += (groupByKey -> scoreSum)
}
}
// process global groupBy
val (ordered, grouped) = if (globalQueryOption.groupBy.keys.nonEmpty) {
for {
edgeWithScore <- ls
groupByKey = edgeWithScore.groupByValues
} {
val buffer = agg.getOrElseUpdate(groupByKey, ListBuffer.empty[EdgeWithScore])
buffer += edgeWithScore
val newScore = sums.getOrElse(groupByKey, 0.0) + edgeWithScore.score
sums += (groupByKey -> newScore)
}
val grouped = for {
(groupByKey, scoreSum) <- sums.toSeq.sortBy(_._2 * -1)
aggregated = agg(groupByKey) if aggregated.nonEmpty
sorted = orderBy(globalQueryOption, aggregated)
} yield (groupByKey, (scoreSum, sorted))
(Nil, grouped)
} else {
val ordered = orderBy(globalQueryOption, ls)
(ordered, Nil)
}
StepResult(edgeWithScores = ordered, grouped = grouped, degrees, failCount = multiStepResults.map(_.failCount).sum)
}