in projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala [989:1013]
private def createRecordWithOffset[Envelope](envelope: Envelope): Option[RecordWithOffset] = {
envelope match {
case eventEnvelope: EventEnvelope[_] if eventEnvelope.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = eventEnvelope.offset.asInstanceOf[TimestampOffset]
Some(
RecordWithOffset(
Record(eventEnvelope.persistenceId, eventEnvelope.sequenceNr, timestampOffset.timestamp),
timestampOffset,
strictSeqNr = true,
envelopeLoaded = eventEnvelope.eventOption.isDefined))
case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
Some(
RecordWithOffset(
Record(change.persistenceId, change.revision, timestampOffset.timestamp),
timestampOffset,
strictSeqNr = false,
envelopeLoaded = change.value != null))
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// FIXME case DeletedDurableState when that is added
throw new IllegalArgumentException(
s"DurableStateChange [${change.getClass.getName}] not implemented yet. Please report bug at https://github.com/apache/incubator-pekko-persistence-r2dbc/issues")
case _ => None
}
}