def apply()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [126:194]


    def apply(tw: TagWrite, owner: ActorRef): BulkTagWrite =
      BulkTagWrite(tw :: Nil, Nil)
  }

  /**
   * All tag writes should be for the same persistenceId
   */
  private[pekko] case class BulkTagWrite(tagWrites: immutable.Seq[TagWrite], withoutTags: immutable.Seq[Serialized])
      extends NoSerializationVerificationNeeded

  /**
   * All serialised should be for the same persistenceId
   *
   * @param actorRunning migration sends these messages without the actor running so TagWriters should not
   *                     validate that the pid is running
   */
  private[pekko] case class TagWrite(tag: Tag, serialised: immutable.Seq[Serialized], actorRunning: Boolean = true)
      extends NoSerializationVerificationNeeded

  def props(settings: TagWriterSettings, tagWriterSession: TagWritersSession): Props =
    Props(new TagWriters(settings, tagWriterSession))

  final case class FlushAllTagWriters(timeout: Timeout)

  case object AllFlushed

  final case class SetTagProgress(pid: String, tagProgresses: Map[Tag, TagProgress])

  case object TagProcessAck

  final case class PersistentActorStarting(pid: String, persistentActor: ActorRef)

  case object PersistentActorStartingAck

  final case class TagWriteFailed(reason: Throwable)

  private case object WriteTagScanningTick

  private case class WriteTagScanningCompleted(result: Try[Done], startTime: Long, size: Int)

  private case class PersistentActorTerminated(pid: PersistenceId, ref: ActorRef)

  private case class TagWriterTerminated(tag: String)

  /**
   * @param message   the message to send
   */
  private case class PassivateBufferEntry(message: Any, response: Promise[Any])

}

/**
 * INTERNAL API
 * Manages all the tag writers.
 */
@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings,
    tagWriterSession: TagWriters.TagWritersSession)
    extends Actor
    with Timers
    with ActorLogging {

  import TagWriters._
  import context.dispatcher

  // eager init and val because used from Future callbacks
  override val log: LoggingAdapter = super.log

  // Escalate to the journal so it can stop
  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {