in s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala [83:137]
def toEdges[K: CanSKeyValue](kvs: Seq[K],
queryRequest: QueryRequest,
prevScore: Double = 1.0,
isInnerCall: Boolean,
parentEdges: Seq[EdgeWithScore],
startOffset: Int = 0,
len: Int = Int.MaxValue): StepResult = {
val toSKeyValue = implicitly[CanSKeyValue[K]].toSKeyValue _
if (kvs.isEmpty) StepResult.Empty.copy(cursors = Seq(dummyCursor))
else {
val queryOption = queryRequest.query.queryOption
val queryParam = queryRequest.queryParam
val label = queryParam.label
val first = kvs.head
val kv = first
val schemaVer = queryParam.label.schemaVersion
val cacheElementOpt =
if (queryParam.isSnapshotEdge) None
else serDe.indexEdgeDeserializer(schemaVer).fromKeyValues(Seq(kv), None)
val (degreeEdges, keyValues) = cacheElementOpt match {
case None => (Nil, kvs)
case Some(cacheElement) =>
val head = cacheElement
if (!head.isDegree) (Nil, kvs)
else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail)
}
val lastCursor: Seq[Array[Byte]] = Seq(if (keyValues.nonEmpty) toSKeyValue(keyValues(keyValues.length - 1)).row else dummyCursor)
val edgeWithScores = for {
(kv, idx) <- keyValues.zipWithIndex if idx >= startOffset && idx < startOffset + len
edge <- (if (queryParam.isSnapshotEdge) toSnapshotEdge(kv, queryRequest, None, isInnerCall, parentEdges) else toEdge(kv, queryRequest, cacheElementOpt, parentEdges)).toSeq
edgeWithScore <- edgeToEdgeWithScore(queryRequest, edge, parentEdges)
} yield {
edgeWithScore
}
if (!queryOption.ignorePrevStepCache) {
StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
} else {
val sampled =
if (queryRequest.queryParam.sample >= 0) sample(edgeWithScores, queryParam.offset, queryParam.sample)
else edgeWithScores
val normalized = if (queryParam.shouldNormalize) normalize(sampled) else sampled
StepResult(edgeWithScores = normalized, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor)
}
}
}