in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala [198:290]
private def writeInProgress(
buffer: Buffer,
tagPidSequenceNrs: Map[PersistenceId, TagPidSequenceNr],
awaitingFlush: Option[ActorRef]): Receive = {
case DropState(pid) =>
log.debug("Dropping state for pid: [{}]", pid)
become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs - pid, awaitingFlush))
case InternalFlush =>
// Ignore, we will check when the write is done
case Flush =>
log.debug("External flush while write in progress. Will flush after write complete")
become(writeInProgress(buffer, tagPidSequenceNrs, Some(sender())))
case TagWrite(_, payload, _) =>
val (updatedTagPidSequenceNrs, events) =
assignTagPidSequenceNumbers(payload.toVector, tagPidSequenceNrs)
val awaitingWrite = AwaitingWrite(events, OptionVal(sender()))
val now = System.nanoTime()
if (buffer.size > (4 * settings.maxBatchSize) && now > (lastLoggedBufferNs + bufferWarningMinDurationNs)) {
lastLoggedBufferNs = now
log.warning(
"Buffer for tagged events is getting too large ({}), is Cassandra responsive? Are writes failing? " +
"If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: {}",
buffer.size,
formatOffset(buffer.nextBatch.head.events.head._1.timeUuid))
}
// buffer until current query is finished
// Don't sort until the write has finished
val newBuffer = buffer.addPending(awaitingWrite)
become(writeInProgress(newBuffer, updatedTagPidSequenceNrs, awaitingFlush))
case TagWriteDone(summary, doneNotify) =>
log.debug("Tag write done: {}", summary)
val nextBuffer = buffer.writeComplete()
buffer.nextBatch.foreach { write =>
write.ack match {
case OptionVal.None =>
case ref => ref.x ! Done
}
}
summary.foreach {
case (id, PidProgress(_, seqNrTo, tagPidSequenceNr, offset)) =>
// These writes do not block future writes. We don't read the tag progress again from C*
// until a restart has happened. This is best effort and expect recovery to replay any
// events that aren't in the tag progress table
session.writeProgress(tag, id, seqNrTo, tagPidSequenceNr, offset).onComplete {
case Success(_) =>
case Failure(t) =>
log.warning(
"Tag progress write has failed for pid: {} seqNrTo: {} tagPidSequenceNr: {} offset: {}. " +
"If this is the only Cassandra error things will continue to work but if this keeps happening it will " +
s" mean slower recovery as tag_views will need to be repaired. Reason: $t",
id,
seqNrTo,
tagPidSequenceNr,
formatOffset(offset))
parent ! TagWriters.TagWriteFailed(t)
}
}
awaitingFlush match {
case Some(replyTo) =>
log.debug("External flush request")
if (buffer.pending.nonEmpty) {
write(nextBuffer, tagPidSequenceNrs, awaitingFlush)
} else {
replyTo ! FlushComplete
context.become(idle(nextBuffer, tagPidSequenceNrs))
}
case None =>
flushIfRequired(nextBuffer, tagPidSequenceNrs)
}
sendPubsubNotification()
doneNotify.foreach(_ ! FlushComplete)
case TagWriteFailed(t, _) =>
log.warning(
"Writing tags has failed. This means that any eventsByTag query will be out of date. " +
"The write will be retried. Reason {}",
t)
timers.startSingleTimer(FlushKey, InternalFlush, settings.flushInterval)
parent ! TagWriters.TagWriteFailed(t)
context.become(idle(buffer, tagPidSequenceNrs))
case ResetPersistenceId(_, tp @ TagProgress(pid, _, _)) =>
log.debug("Resetting persistence id {}. TagProgress {}", pid, tp)
become(writeInProgress(buffer.remove(pid), tagPidSequenceNrs + (pid -> tp.pidTagSequenceNr), awaitingFlush))
sender() ! ResetPersistenceIdComplete
case ReceiveTimeout =>
// not idle
case StopTagWriter =>
// not idle any more
parent ! CancelPassivateTagWriter(tag)
}