in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [156:196]
private def idle(buffer: Buffer, tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr]): Receive = {
case DropState(pid) =>
log.debug("Dropping state for pid: {}", pid)
context.become(idle(buffer.remove(pid), tagPidSequenceNrs - pid))
case InternalFlush =>
log.debug("Flushing")
if (buffer.nonEmpty) {
write(buffer, tagPidSequenceNrs, None)
}
case Flush =>
if (buffer.nonEmpty) {
log.debug("External flush request from [{}]. Flushing.", sender())
write(buffer, tagPidSequenceNrs, Some(sender()))
} else {
log.debug("External flush request from [{}], buffer empty.", sender())
sender() ! FlushComplete
}
case TagWrite(_, payload, _) =>
val (newTagPidSequenceNrs, events: Seq[(Serialized, TagPidSequenceNr)]) = {
assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs)
}
val newWrite = AwaitingWrite(events, OptionVal(sender()))
val newBuffer = buffer.add(newWrite)
flushIfRequired(newBuffer, newTagPidSequenceNrs)
case twd: TagWriteDone =>
log.error("Received Done when in idle state. This is a bug. Please report with DEBUG logs: {}", twd)
case ResetPersistenceId(_, tp @ TagProgress(pid, _, tagPidSequenceNr)) =>
log.debug("Resetting pid {}. TagProgress {}", pid, tp)
become(idle(buffer.remove(pid), tagPidSequenceNrs + (pid -> tagPidSequenceNr)))
sender() ! ResetPersistenceIdComplete
case ReceiveTimeout =>
if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
parent ! PassivateTagWriter(tag)
case StopTagWriter =>
if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
context.stop(self)
else
parent ! CancelPassivateTagWriter(tag)
}