in s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala [372:481]
def filterEdges(q: Query,
stepIdx: Int,
queryRequests: Seq[QueryRequest],
queryResultLsFuture: Future[Seq[StepResult]],
queryParams: Seq[QueryParam],
buildLastStepInnerResult: Boolean = true,
parentEdges: Map[VertexId, Seq[EdgeWithScore]])
(implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = {
queryResultLsFuture.map { queryRequestWithResultLs =>
val (cursors, failCount) = {
val _cursors = ArrayBuffer.empty[Array[Byte]]
var _failCount = 0
queryRequestWithResultLs.foreach { stepResult =>
_cursors.append(stepResult.cursors: _*)
_failCount += stepResult.failCount
}
_cursors -> _failCount
}
if (queryRequestWithResultLs.isEmpty) StepResult.Empty.copy(failCount = failCount)
else {
val isLastStep = stepIdx == q.steps.size - 1
val queryOption = q.queryOption
val step = q.steps(stepIdx)
val currentStepResults = queryRequests.view.zip(queryRequestWithResultLs)
val shouldBuildInnerResults = !isLastStep || buildLastStepInnerResult
val degrees = queryRequestWithResultLs.flatMap(_.degreeEdges)
if (shouldBuildInnerResults) {
val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
edgeWithScore
}
/* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
StepResult(edgeWithScores = results, grouped = Nil, degreeEdges = degrees, cursors = cursors, failCount = failCount)
} else {
val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) =>
val edge = edgeWithScore.edge
val score = edgeWithScore.score
/* Select */
val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns)
// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs)
val newEdge = edge.copyEdgeWithState(mergedPropsWithTs)
val newEdgeWithScore = edgeWithScore.copy(edge = newEdge)
/* OrderBy */
val orderByValues =
if (queryOption.orderByKeys.isEmpty) (score, edge.getTs(), None, None)
else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys))
/* StepGroupBy */
val stepGroupByValues = newEdgeWithScore.toValues(step.groupBy.keys)
/* GroupBy */
val groupByValues = newEdgeWithScore.toValues(queryOption.groupBy.keys)
/* FilterOut */
val filterOutValues = newEdgeWithScore.toValues(queryOption.filterOutFields)
newEdgeWithScore.copy(orderByValues = orderByValues,
stepGroupByValues = stepGroupByValues,
groupByValues = groupByValues,
filterOutValues = filterOutValues)
}
/* process step group by */
val results = StepResult.filterOutStepGroupBy(_results, step.groupBy)
/* process ordered list */
val ordered = if (queryOption.groupBy.keys.isEmpty) StepResult.orderBy(queryOption, results) else Nil
/* process grouped list */
val grouped =
if (queryOption.groupBy.keys.isEmpty) Nil
else {
val agg = new scala.collection.mutable.HashMap[StepResult.GroupByKey, (Double, StepResult.Values)]()
results.groupBy { edgeWithScore =>
// edgeWithScore.groupByValues.map(_.map(_.toString))
edgeWithScore.groupByValues
}.foreach { case (k, ls) =>
val (scoreSum, merged) = StepResult.mergeOrdered(ls, Nil, queryOption)
val newScoreSum = scoreSum
/*
* watch out here. by calling toString on Any, we lose type information which will be used
* later for toJson.
*/
if (merged.nonEmpty) {
val newKey = merged.head.groupByValues
agg += ((newKey, (newScoreSum, merged)))
}
}
agg.toSeq.sortBy(_._2._1 * -1)
}
StepResult(edgeWithScores = ordered, grouped = grouped, degreeEdges = degrees, cursors = cursors, failCount = failCount)
}
}
}
}