private def writeInProgress()

in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [198:290]


  private def writeInProgress(
      buffer: Buffer,
      tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr],
      awaitingFlush: Option[ActorRef]): Receive = {
    case DropState(pid) =>
      log.debug("Dropping state for pid: [{}]", pid)
      become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs - pid, awaitingFlush))
    case InternalFlush =>
    // Ignore, we will check when the write is done
    case Flush =>
      log.debug("External flush while write in progress. Will flush after write complete")
      become(writeInProgress(buffer, tagPidSequenceNrs, Some(sender())))
    case TagWrite(_, payload, _) =>
      val (updatedTagPidSequenceNrs, events) =
        assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs)
      val awaitingWrite = AwaitingWrite(events, OptionVal(sender()))
      val now = System.nanoTime()
      if (buffer.size > (4 * settings.maxBatchSize) && now > (lastLoggedBufferNs + bufferWarningMinDurationNs)) {
        lastLoggedBufferNs = now
        log.warning(
          "Buffer for tagged events is getting too large ({}), is Cassandra responsive? Are writes failing? " +
          "If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: {}",
          buffer.size,
          formatOffset(buffer.nextBatch.head.events.head._1.timeUuid))
      }
      // buffer until current query is finished
      // Don't sort until the write has finished
      val newBuffer = buffer.addPending(awaitingWrite)
      become(writeInProgress(newBuffer, updatedTagPidSequenceNrs, awaitingFlush))
    case TagWriteDone(summary, doneNotify) =>
      log.debug("Tag write done: {}", summary)
      val nextBuffer = buffer.writeComplete()
      buffer.nextBatch.foreach { write =>
        write.ack match {
          case OptionVal.None =>
          case ref            => ref.x ! Done
        }
      }
      summary.foreach {
        case (id, PidProgress(_, seqNrTo, tagPidSequenceNr, offset)) =>
          // These writes do not block future writes. We don't read the tag progress again from C*
          // until a restart has happened. This is best effort and expect recovery to replay any
          // events that aren't in the tag progress table
          session.writeProgress(tag, id, seqNrTo, tagPidSequenceNr, offset).onComplete {
            case Success(_) =>
            case Failure(t) =>
              log.warning(
                "Tag progress write has failed for pid: {} seqNrTo: {} tagPidSequenceNr: {} offset: {}. " +
                "If this is the only Cassandra error things will continue to work but if this keeps happening it will " +
                s" mean slower recovery as tag_views will need to be repaired. Reason: $t",
                id,
                seqNrTo,
                tagPidSequenceNr,
                formatOffset(offset))
              parent ! TagWriters.TagWriteFailed(t)
          }
      }
      awaitingFlush match {
        case Some(replyTo) =>
          log.debug("External flush request")
          if (buffer.pending.nonEmpty) {
            write(nextBuffer, tagPidSequenceNrs, awaitingFlush)
          } else {
            replyTo ! FlushComplete
            context.become(idle(nextBuffer, tagPidSequenceNrs))
          }
        case None =>
          flushIfRequired(nextBuffer, tagPidSequenceNrs)
      }
      sendPubsubNotification()
      doneNotify.foreach(_ ! FlushComplete)

    case TagWriteFailed(t, _) =>
      log.warning(
        "Writing tags has failed. This means that any eventsByTag query will be out of date. " +
        "The write will be retried. Reason {}",
        t)
      timers.startSingleTimer(FlushKey, InternalFlush, settings.flushInterval)
      parent ! TagWriters.TagWriteFailed(t)
      context.become(idle(buffer, tagPidSequenceNrs))

    case ResetPersistenceId(_, tp @ TagProgress(pid, _, _)) =>
      log.debug("Resetting persistence id {}. TagProgress {}", pid, tp)
      become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs + (pid -> tp.pidTagSequenceNr), awaitingFlush))
      sender() ! ResetPersistenceIdComplete

    case ReceiveTimeout =>
    // not idle

    case StopTagWriter =>
      // not idle any more
      parent ! CancelPassivateTagWriter(tag)
  }