override def fromKeyValues[T: CanSKeyValue]()

in s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala [39:116]


  override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
                                              cacheElementOpt: Option[SnapshotEdge]): Option[SnapshotEdge] = {
    try {
      val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
      assert(kvs.size == 1)

      val kv = kvs.head
      val version = kv.timestamp
      var pos = 0
      val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
      pos += srcIdLen

      val isTallSchema = pos + 5 != kv.row.length
      var tgtVertexId = TargetVertexId(ServiceColumn.Default, srcVertexId.innerId)

      if (isTallSchema) {
        val (tgtId, tgtBytesLen) = InnerVal.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
        tgtVertexId = TargetVertexId(ServiceColumn.Default, tgtId)
        pos += tgtBytesLen
      }

      val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
      pos += 4
      val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
      pos += 1

      if (!isInverted) None
      else {
        val label = Label.findById(labelWithDir.labelId)
        val schemaVer = label.schemaVersion
//        val srcVertexId = SourceVertexId(ServiceColumn.Default, srcIdAndTgtId.srcInnerId)
//        val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtId.tgtInnerId)

        var pos = 0
        val (statusCode, op) = statusCodeWithOp(kv.value(pos))
        pos += 1
        val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
        val kvsMap = props.toMap
        val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
        val ts = tsInnerVal.toString.toLong
        pos = endAt

        val _pendingEdgeOpt =
          if (pos == kv.value.length) None
          else {
            val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
            pos += 1
            //          val versionNum = Bytes.toLong(kv.value, pos, 8)
            //          pos += 8
            val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
            pos = endAt
            val lockTs = Option(Bytes.toLong(kv.value, pos, 8))

            val pendingEdge =
              builder.newEdge(
                builder.newVertex(srcVertexId, version),
                builder.newVertex(tgtVertexId, version),
                label, labelWithDir.dir, pendingEdgeOp,
                version, pendingEdgeProps.toMap,
                statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))

            Option(pendingEdge)
          }

        val snapshotEdge = builder.newSnapshotEdge(
          builder.newVertex(srcVertexId, ts),
          builder.newVertex(tgtVertexId, ts),
          label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
          pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))

        Option(snapshotEdge)
      }
    } catch {
      case e: Exception =>
        logger.error("#" * 100, e)
        None
    }
  }