private def idle()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [156:196]


  private def idle(buffer: Buffer, tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr]): Receive = {
    case DropState(pid) =>
      log.debug("Dropping state for pid: {}", pid)
      context.become(idle(buffer.remove(pid), tagPidSequenceNrs - pid))
    case InternalFlush =>
      log.debug("Flushing")
      if (buffer.nonEmpty) {
        write(buffer, tagPidSequenceNrs, None)
      }
    case Flush =>
      if (buffer.nonEmpty) {
        log.debug("External flush request from [{}]. Flushing.", sender())
        write(buffer, tagPidSequenceNrs, Some(sender()))
      } else {
        log.debug("External flush request from [{}], buffer empty.", sender())
        sender() ! FlushComplete
      }
    case TagWrite(_, payload, _) =>
      val (newTagPidSequenceNrs, events: Seq[(Serialized, TagPidSequenceNr)]) = {
        assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs)
      }
      val newWrite = AwaitingWrite(events, OptionVal(sender()))
      val newBuffer = buffer.add(newWrite)
      flushIfRequired(newBuffer, newTagPidSequenceNrs)
    case twd: TagWriteDone =>
      log.error("Received Done when in idle state. This is a bug. Please report with DEBUG logs: {}", twd)
    case ResetPersistenceId(_, tp @ TagProgress(pid, _, tagPidSequenceNr)) =>
      log.debug("Resetting pid {}. TagProgress {}", pid, tp)
      become(idle(buffer.remove(pid), tagPidSequenceNrs + (pid -> tagPidSequenceNr)))
      sender() ! ResetPersistenceIdComplete

    case ReceiveTimeout =>
      if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
        parent ! PassivateTagWriter(tag)

    case StopTagWriter =>
      if (buffer.isEmpty && tagPidSequenceNrs.isEmpty)
        context.stop(self)
      else
        parent ! CancelPassivateTagWriter(tag)
  }