in core/src/main/scala/org/apache/pekko/persistence/cassandra/Extractors.scala [182:214]
private def extractPersistentRepr(row: Row, ed: EventDeserializer, s: Serialization, async: Boolean)(
implicit ec: ExecutionContext): Future[PersistentRepr] = {
def deserializeEvent(): Future[PersistentRepr] = {
ed.deserializeEvent(row, async).map {
case DeserializedEvent(payload, metadata: OptionVal[Any]) =>
val repr = PersistentRepr(
payload,
sequenceNr = row.getLong("sequence_nr"),
persistenceId = row.getString("persistence_id"),
manifest = row.getString("event_manifest"), // manifest for event adapters
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid"))
metadata match {
case OptionVal.None => repr
case some => repr.withMetadata(some.x)
}
}
}
if (ed.columnDefinitionCache.hasMessageColumn(row)) {
row.getByteBuffer("message") match {
case null => deserializeEvent()
case bytes =>
// For backwards compatibility, reading serialized PersistentRepr from "message" column.
// Used in v0.6 and earlier. In later versions the "event" column is used for the serialized event.
Future.successful(persistentFromByteBuffer(s, bytes))
}
} else {
deserializeEvent()
}
}