def deserializeEvent()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [876:929]


    def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[DeserializedEvent] =
      try {

        def meta: OptionVal[AnyRef] = {
          if (columnDefinitionCache.hasMetaColumns(row)) {
            row.getByteBuffer("meta") match {
              case null =>
                OptionVal.None // no meta data
              case metaBytes =>
                // has meta data, wrap in EventWithMetaData
                val metaSerId = row.getInt("meta_ser_id")
                val metaSerManifest = row.getString("meta_ser_manifest")
                serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match {
                  case Success(m) => OptionVal.Some(m)
                  case Failure(ex) =>
                    log.warning(
                      "Deserialization of event metadata failed (pid: [{}], seq_nr: [{}], meta_ser_id: [{}], meta_ser_manifest: [{}], ignoring metadata content. Exception: {}",
                      Array(
                        row.getString("persistence_id"),
                        row.getLong("sequence_nr"),
                        metaSerId,
                        metaSerManifest,
                        ex.toString))
                    OptionVal.None
                }
            }
          } else {
            // for backwards compatibility, when table was not altered, meta columns not added
            OptionVal.None // no meta data
          }
        }

        val bytes = Bytes.getArray(row.getByteBuffer("event"))
        val serId = row.getInt("ser_id")
        val manifest = row.getString("ser_manifest")

        (serialization.serializerByIdentity.get(serId) match {
          case Some(asyncSerializer: AsyncSerializer) =>
            Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
              asyncSerializer.fromBinaryAsync(bytes, manifest)
            }

          case _ =>
            def deserializedEvent: AnyRef =
              // Serialization.deserialize adds transport info
              serialization.deserialize(bytes, serId, manifest).get

            if (async) Future(deserializedEvent)
            else Future.successful(deserializedEvent)
        }).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic)

      } catch {
        case NonFatal(e) => Future.failed(e)
      }