in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriters.scala [126:194]
def apply(tw: TagWrite, owner: ActorRef): BulkTagWrite =
BulkTagWrite(tw :: Nil, Nil)
}
/**
* All tag writes should be for the same persistenceId
*/
private[pekko] case class BulkTagWrite(tagWrites: immutable.Seq[TagWrite], withoutTags: immutable.Seq[Serialized])
extends NoSerializationVerificationNeeded
/**
* All serialised should be for the same persistenceId
*
* @param actorRunning migration sends these messages without the actor running so TagWriters should not
* validate that the pid is running
*/
private[pekko] case class TagWrite(tag: Tag, serialised: immutable.Seq[Serialized], actorRunning: Boolean = true)
extends NoSerializationVerificationNeeded
def props(settings: TagWriterSettings, tagWriterSession: TagWritersSession): Props =
Props(new TagWriters(settings, tagWriterSession))
final case class FlushAllTagWriters(timeout: Timeout)
case object AllFlushed
final case class SetTagProgress(pid: String, tagProgresses: Map[Tag, TagProgress])
case object TagProcessAck
final case class PersistentActorStarting(pid: String, persistentActor: ActorRef)
case object PersistentActorStartingAck
final case class TagWriteFailed(reason: Throwable)
private case object WriteTagScanningTick
private case class WriteTagScanningCompleted(result: Try[Done], startTime: Long, size: Int)
private case class PersistentActorTerminated(pid: PersistenceId, ref: ActorRef)
private case class TagWriterTerminated(tag: String)
/**
* @param message the message to send
*/
private case class PassivateBufferEntry(message: Any, response: Promise[Any])
}
/**
* INTERNAL API
* Manages all the tag writers.
*/
@InternalApi private[pekko] class TagWriters(settings: TagWriterSettings,
tagWriterSession: TagWriters.TagWritersSession)
extends Actor
with Timers
with ActorLogging {
import TagWriters._
import context.dispatcher
// eager init and val because used from Future callbacks
override val log: LoggingAdapter = super.log
// Escalate to the journal so it can stop
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {