def buildRequest()

in s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala [215:307]


  def buildRequest(client: HBaseClient, serDe: StorageSerDe, queryRequest: QueryRequest, edge: S2EdgeLike) = {
    val queryParam = queryRequest.queryParam
    val label = queryParam.label

    val serializer = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
      val snapshotEdge = edge.toSnapshotEdge
      serDe.snapshotEdgeSerializer(snapshotEdge)
    } else {
      val indexEdge = edge.toIndexEdge(queryParam.labelOrderSeq)
      serDe.indexEdgeSerializer(indexEdge)
    }

    val rowKey = serializer.toRowKey
    val (minTs, maxTs) = queryParam.durationOpt.getOrElse((0L, Long.MaxValue))

    val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge))

    label.schemaVersion match {
      case HBaseType.VERSION4 if queryParam.tgtVertexInnerIdOpt.isEmpty =>
        val scanner = AsynchbasePatcher.newScanner(client, label.hbaseTableName)
        scanner.setFamily(SKeyValue.EdgeCf)

        /*
         * TODO: remove this part.
         */
        val indexEdgeOpt = edge.edgesWithIndex.find(edgeWithIndex => edgeWithIndex.labelIndex.seq == queryParam.labelOrderSeq)
        val indexEdge = indexEdgeOpt.getOrElse(throw new RuntimeException(s"Can`t find index for query $queryParam"))

        val srcIdBytes = VertexId.toSourceVertexId(indexEdge.srcVertex.id).bytes
        val labelWithDirBytes = indexEdge.labelWithDir.bytes
        val labelIndexSeqWithIsInvertedBytes = StorageSerializable.labelOrderSeqWithIsInverted(indexEdge.labelIndexSeq, isInverted = false)
        val baseKey = Bytes.add(srcIdBytes, labelWithDirBytes, labelIndexSeqWithIsInvertedBytes)

        val (startKey, stopKey) =
          if (queryParam.intervalOpt.isDefined) {
            // interval is set.
            val _startKey = queryParam.cursorOpt match {
              case Some(cursor) => Base64.getDecoder.decode(cursor)
              case None => Bytes.add(baseKey, intervalMaxBytes)
            }
            (_startKey , Bytes.add(baseKey, intervalMinBytes))
          } else {
            /*
             * note: since propsToBytes encode size of property map at first byte, we are sure about max value here
             */
            val _startKey = queryParam.cursorOpt match {
              case Some(cursor) => Base64.getDecoder.decode(cursor)
              case None => baseKey
            }
            (_startKey, Bytes.add(baseKey, Array.fill(1)(-1)))
          }

        scanner.setStartKey(startKey)
        scanner.setStopKey(stopKey)

        if (queryParam.limit == Int.MinValue) logger.debug(s"MinValue: $queryParam")

        scanner.setMaxVersions(1)
        // TODO: exclusive condition innerOffset with cursorOpt
        if (queryParam.cursorOpt.isDefined) {
          scanner.setMaxNumRows(queryParam.limit)
        } else {
          scanner.setMaxNumRows(queryParam.innerOffset + queryParam.innerLimit)
        }
        scanner.setMaxTimestamp(maxTs)
        scanner.setMinTimestamp(minTs)
        scanner.setRpcTimeout(queryParam.rpcTimeout)

        // SET option for this rpc properly.
        if (queryParam.cursorOpt.isDefined) Right(ScanWithRange(scanner, 0, queryParam.limit))
        else Right(ScanWithRange(scanner, 0, queryParam.innerOffset + queryParam.innerLimit))

      case _ =>
        val get = if (queryParam.tgtVertexInnerIdOpt.isDefined) {
          new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf, serializer.toQualifier)
        } else {
          new GetRequest(label.hbaseTableName.getBytes, rowKey, SKeyValue.EdgeCf)
        }

        get.maxVersions(1)
        get.setFailfast(true)
        get.setMinTimestamp(minTs)
        get.setMaxTimestamp(maxTs)
        get.setTimeout(queryParam.rpcTimeout)

        val pagination = new ColumnPaginationFilter(queryParam.innerLimit, queryParam.innerOffset)
        val columnRangeFilterOpt = queryParam.intervalOpt.map { interval =>
          new ColumnRangeFilter(intervalMaxBytes, true, intervalMinBytes, true)
        }
        get.setFilter(new FilterList(pagination +: columnRangeFilterOpt.toSeq, MUST_PASS_ALL))
        Left(get)
    }
  }