in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [51:105]
private[pekko] def props(settings: TagWriterSettings, session: TagWritersSession, tag: Tag, parent: ActorRef): Props =
Props(new TagWriter(settings, session, tag, parent))
private[pekko] case class TagWriterSettings(
maxBatchSize: Int,
flushInterval: FiniteDuration,
scanningFlushInterval: FiniteDuration,
stopTagWriterWhenIdle: FiniteDuration,
pubsubNotification: Duration)
private[pekko] case class TagProgress(
persistenceId: PersistenceId,
sequenceNr: SequenceNr,
pidTagSequenceNr: TagPidSequenceNr)
/*
* Reset pid tag sequence numbers to the given [[TagProgress]] and discard any messages for the given persistenceId
*/
private[pekko] case class ResetPersistenceId(tag: Tag, progress: TagProgress)
extends NoSerializationVerificationNeeded
/*
* Sent in response to a [[ResetPersistenceId]]
*/
private[pekko] case object ResetPersistenceIdComplete
// Flush buffer regardless of size
private[pekko] case object Flush
private[pekko] case object FlushComplete
// The "passivate pattern" is used to avoid loosing messages between TagWriters (parent)
// and TagWriter when the TagWriter is stopped due to inactivity.
// When idle for longer than configured stopTagWriterWhenIdle the TagWriter sends `PassivateTagWriter` to parent,
// which replies with `StopTagWriter` and starts buffering incoming messages for the tag.
// The TagWriter stops when receiving StopTagWriter if it is still ok to passivate (no state, nothing in progress).
// TagWriters (parent) sends buffered messages if any when the TagWriter has been terminated.
private[pekko] final case class PassivateTagWriter(tag: String)
private[pekko] final case class CancelPassivateTagWriter(tag: String)
private[pekko] case object StopTagWriter
type TagWriteSummary = Map[PersistenceId, PidProgress]
case class PidProgress(seqNrFrom: SequenceNr, seqNrTo: SequenceNr, tagPidSequenceNr: TagPidSequenceNr, offset: UUID)
private case object InternalFlush
private case object FlushKey
sealed trait TagWriteFinished
final case class TagWriteDone(summary: TagWriteSummary, doneNotify: Option[ActorRef]) extends TagWriteFinished
private final case class TagWriteFailed(reason: Throwable, failedEvents: Vector[AwaitingWrite])
extends TagWriteFinished
private[pekko] case class DropState(pid: PersistenceId)
val timeUuidOrdering: Ordering[UUID] = new Ordering[UUID] {
override def compare(x: UUID, y: UUID) =
UUIDComparator.comparator.compare(x, y)
}