in s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala [215:307]
def buildRequest(client: HBaseClient, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike) = {
val queryParam = queryRequest.queryParam
val label = queryParam.label
val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
val snapshotEdge = edge.toSnapshotEdge
serDe.snapshotEdgeSerializer(snapshotEdge)
} else {
val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
serDe.indexEdgeSerializer(indexEdge)
}
val rowKey = serializer.toRowKey
val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))
val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))
label.schemaVersion match {
case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
scanner.setFamily(SKeyValue.EdgeCf)
/*
* TODO: remove this part.
*/
val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))
val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
val labelWithDirBytes = indexEdge.labelWithDir.bytes
val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)
val (startKey, stopKey) =
if (queryParam.intervalOpt.isDefined) {
// interval is set.
val _startKey = queryParam.cursorOpt match {
case Some(cursor) => Base64.getDecoder.decode(cursor)
case None => Bytes.add(baseKey, intervalMaxBytes)
}
(_startKey , Bytes.add(baseKey, intervalMinBytes))
} else {
/*
* note: since propsToBytes encode size of property map at first byte, we are sure about max value here
*/
val _startKey = queryParam.cursorOpt match {
case Some(cursor) => Base64.getDecoder.decode(cursor)
case None => baseKey
}
(_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
}
scanner.setStartKey(startKey)
scanner.setStopKey(stopKey)
if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")
scanner.setMaxVersions(1)
// TODO: exclusive condition innerOffset with cursorOpt
if (queryParam.cursorOpt.isDefined) {
scanner.setMaxNumRows(queryParam.limit)
} else {
scanner.setMaxNumRows(queryParam.innerOffset + queryParam.innerLimit)
}
scanner.setMaxTimestamp(maxTs)
scanner.setMinTimestamp(minTs)
scanner.setRpcTimeout(queryParam.rpcTimeout)
// SET option for this rpc properly.
if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))
case _ =>
val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf, serializer.toQualifier)
} else {
new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf)
}
get.maxVersions(1)
get.setFailfast(true)
get.setMinTimestamp(minTs)
get.setMaxTimestamp(maxTs)
get.setTimeout(queryParam.rpcTimeout)
val pagination = new ColumnPaginationFilter(queryParam.innerLimit, queryParam.innerOffset)
val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
}
get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
Left(get)
}
}