in s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala [39:116]
override def fromKeyValues[T: CanSKeyValue](_kvs: Seq[T],
cacheElementOpt: Option[SnapshotEdge]): Option[SnapshotEdge] = {
try {
val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) }
assert(kvs.size == 1)
val kv = kvs.head
val version = kv.timestamp
var pos = 0
val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
pos += srcIdLen
val isTallSchema = pos + 5 != kv.row.length
var tgtVertexId = TargetVertexId(ServiceColumn.Default, srcVertexId.innerId)
if (isTallSchema) {
val (tgtId, tgtBytesLen) = InnerVal.fromBytes(kv.row, pos, kv.row.length, HBaseType.DEFAULT_VERSION)
tgtVertexId = TargetVertexId(ServiceColumn.Default, tgtId)
pos += tgtBytesLen
}
val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4))
pos += 4
val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos)
pos += 1
if (!isInverted) None
else {
val label = Label.findById(labelWithDir.labelId)
val schemaVer = label.schemaVersion
// val srcVertexId = SourceVertexId(ServiceColumn.Default, srcIdAndTgtId.srcInnerId)
// val tgtVertexId = SourceVertexId(ServiceColumn.Default, tgtId.tgtInnerId)
var pos = 0
val (statusCode, op) = statusCodeWithOp(kv.value(pos))
pos += 1
val (props, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
val kvsMap = props.toMap
val tsInnerVal = kvsMap(LabelMeta.timestamp).innerVal
val ts = tsInnerVal.toString.toLong
pos = endAt
val _pendingEdgeOpt =
if (pos == kv.value.length) None
else {
val (pendingEdgeStatusCode, pendingEdgeOp) = statusCodeWithOp(kv.value(pos))
pos += 1
// val versionNum = Bytes.toLong(kv.value, pos, 8)
// pos += 8
val (pendingEdgeProps, endAt) = bytesToKeyValuesWithTs(kv.value, pos, schemaVer, label)
pos = endAt
val lockTs = Option(Bytes.toLong(kv.value, pos, 8))
val pendingEdge =
builder.newEdge(
builder.newVertex(srcVertexId, version),
builder.newVertex(tgtVertexId, version),
label, labelWithDir.dir, pendingEdgeOp,
version, pendingEdgeProps.toMap,
statusCode = pendingEdgeStatusCode, lockTs = lockTs, tsInnerValOpt = Option(tsInnerVal))
Option(pendingEdge)
}
val snapshotEdge = builder.newSnapshotEdge(
builder.newVertex(srcVertexId, ts),
builder.newVertex(tgtVertexId, ts),
label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode,
pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal))
Option(snapshotEdge)
}
} catch {
case e: Exception =>
logger.error("#" * 100, e)
None
}
}