private[pekko] def props()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [51:105]


  private[pekko] def props(settings: TagWriterSettings, session: TagWritersSession, tag: Tag, parent: ActorRef): Props =
    Props(new TagWriter(settings, session, tag, parent))

  private[pekko] case class TagWriterSettings(
      maxBatchSize: Int,
      flushInterval: FiniteDuration,
      scanningFlushInterval: FiniteDuration,
      stopTagWriterWhenIdle: FiniteDuration,
      pubsubNotification: Duration)

  private[pekko] case class TagProgress(
      persistenceId: PersistenceId,
      sequenceNr: SequenceNr,
      pidTagSequenceNr: TagPidSequenceNr)

  /*
   * Reset pid tag sequence numbers to the given [[TagProgress]] and discard any messages for the given persistenceId
   */
  private[pekko] case class ResetPersistenceId(tag: Tag, progress: TagProgress)
      extends NoSerializationVerificationNeeded

  /*
   * Sent in response to a [[ResetPersistenceId]]
   */
  private[pekko] case object ResetPersistenceIdComplete

  // Flush buffer regardless of size
  private[pekko] case object Flush
  private[pekko] case object FlushComplete

  // The "passivate pattern" is used to avoid loosing messages between TagWriters (parent)
  // and TagWriter when the TagWriter is stopped due to inactivity.
  // When idle for longer than configured stopTagWriterWhenIdle the TagWriter sends `PassivateTagWriter` to parent,
  // which replies with `StopTagWriter` and starts buffering incoming messages for the tag.
  // The TagWriter stops when receiving StopTagWriter if it is still ok to passivate (no state, nothing in progress).
  // TagWriters (parent) sends buffered messages if any when the TagWriter has been terminated.
  private[pekko] final case class PassivateTagWriter(tag: String)
  private[pekko] final case class CancelPassivateTagWriter(tag: String)
  private[pekko] case object StopTagWriter

  type TagWriteSummary = Map[PersistenceId, PidProgress]
  case class PidProgress(seqNrFrom: SequenceNr, seqNrTo: SequenceNr, tagPidSequenceNr: TagPidSequenceNr, offset: UUID)
  private case object InternalFlush
  private case object FlushKey
  sealed trait TagWriteFinished
  final case class TagWriteDone(summary: TagWriteSummary, doneNotify: Option[ActorRef]) extends TagWriteFinished
  private final case class TagWriteFailed(reason: Throwable, failedEvents: Vector[AwaitingWrite])
      extends TagWriteFinished

  private[pekko] case class DropState(pid: PersistenceId)

  val timeUuidOrdering: Ordering[UUID] = new Ordering[UUID] {
    override def compare(x: UUID, y: UUID) =
      UUIDComparator.comparator.compare(x, y)
  }