in s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala [551:639]
private def buildResult[R](query: Query,
stepIdx: Int,
stepResultLs: Seq[(QueryRequest, StepResult)],
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
(createFunc: (EdgeWithScore, Seq[LabelMeta]) => R)
(implicit ev: WithScore[R]): ListBuffer[R] = {
import scala.collection._
val results = ListBuffer.empty[R]
val sequentialLs: ListBuffer[(HashKey, FilterHashKey, R, QueryParam)] = ListBuffer.empty
val duplicates: mutable.HashMap[HashKey, ListBuffer[(FilterHashKey, R)]] = mutable.HashMap.empty
val edgesToExclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
val edgesToInclude: mutable.HashSet[FilterHashKey] = mutable.HashSet.empty
var numOfDuplicates = 0
val queryOption = query.queryOption
val step = query.steps(stepIdx)
val excludeLabelWithDirSet = step.queryParams.filter(_.exclude).map(l => l.labelWithDir).toSet
val includeLabelWithDirSet = step.queryParams.filter(_.include).map(l => l.labelWithDir).toSet
stepResultLs.foreach { case (queryRequest, stepInnerResult) =>
val queryParam = queryRequest.queryParam
val label = queryParam.label
val shouldBeExcluded = excludeLabelWithDirSet.contains(queryParam.labelWithDir)
val shouldBeIncluded = includeLabelWithDirSet.contains(queryParam.labelWithDir)
val propsSelectColumns = (for {
column <- queryOption.propsSelectColumns
labelMeta <- label.metaPropsInvMap.get(column)
} yield labelMeta)
for {
edgeWithScore <- toEdgeWithScores(queryRequest, stepInnerResult, parentEdges)
} {
val edge = edgeWithScore.edge
val (hashKey, filterHashKey) = toHashKey(queryParam, edge, isDegree = false)
// params += (hashKey -> queryParam) //
/* check if this edge should be exlcuded. */
if (shouldBeExcluded) {
edgesToExclude.add(filterHashKey)
} else {
if (shouldBeIncluded) {
edgesToInclude.add(filterHashKey)
}
val newEdgeWithScore = createFunc(edgeWithScore, propsSelectColumns)
sequentialLs += ((hashKey, filterHashKey, newEdgeWithScore, queryParam))
duplicates.get(hashKey) match {
case None =>
val newLs = ListBuffer.empty[(FilterHashKey, R)]
newLs += (filterHashKey -> newEdgeWithScore)
duplicates += (hashKey -> newLs) //
case Some(old) =>
numOfDuplicates += 1
old += (filterHashKey -> newEdgeWithScore) //
}
}
}
}
if (numOfDuplicates == 0) {
// no duplicates at all.
for {
(hashKey, filterHashKey, edgeWithScore, _) <- sequentialLs
if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
} {
results += edgeWithScore
}
} else {
// need to resolve duplicates.
val seen = new mutable.HashSet[HashKey]()
for {
(hashKey, filterHashKey, edgeWithScore, queryParam) <- sequentialLs
if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey)
if !seen.contains(hashKey)
} {
// val queryParam = params(hashKey)
processDuplicates(queryParam, duplicates(hashKey)).foreach { case (_, duplicate) =>
if (ev.score(duplicate) >= queryParam.threshold) {
seen += hashKey
results += duplicate
}
}
}
}
results
}