in actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala [300:676]
private def active(s: State[A]): Behavior[InternalCommand] = {
def onMessage(msg: A, wasStashed: Boolean, replyTo: Option[ActorRef[Done]], totalSeqNr: TotalSeqNr): State[A] = {
val consumersWithDemand = s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector
if (traceEnabled)
context.log.traceN(
"Received message seqNr [{}], wasStashed [{}], consumersWithDemand [{}], hasRequested [{}].",
totalSeqNr,
wasStashed,
consumersWithDemand.map(_._1).mkString(", "),
s.requested)
if (!s.requested && !wasStashed && durableQueue.isEmpty)
throw new IllegalStateException(s"Unexpected message [$msg], wasn't requested nor unstashed.")
val selectedWorker =
if (durableQueue.isDefined) {
s.preselectedWorkers.get(totalSeqNr) match {
case Some(PreselectedWorker(outKey, confirmationQualifier)) =>
s.out.get(outKey) match {
case Some(out) => Right(outKey -> out)
case None =>
// the preselected was deregistered in the meantime
context.self ! ResendDurableMsg(msg, confirmationQualifier, totalSeqNr)
Left(s)
}
case None =>
throw new IllegalStateException(s"Expected preselected worker for seqNr [$totalSeqNr].")
}
} else {
selectWorker() match {
case Some(w) => Right(w)
case None =>
checkStashFull(stashBuffer)
context.log.debug("Stashing message, seqNr [{}]", totalSeqNr)
stashBuffer.stash(Msg(msg, wasStashed = true, replyTo))
val newRequested = if (wasStashed) s.requested else false
Left(s.copy(requested = newRequested))
}
}
selectedWorker match {
case Right((outKey, out)) =>
val newUnconfirmed = out.unconfirmed :+ Unconfirmed(totalSeqNr, out.seqNr, msg, replyTo)
val newOut = s.out.updated(outKey, out.copy(unconfirmed = newUnconfirmed, askNextTo = None))
implicit val askTimeout: Timeout = workerAskTimeout
context.ask[ProducerController.MessageWithConfirmation[A], OutSeqNr](
out.askNextTo.get,
ProducerController.MessageWithConfirmation(msg, _)) {
case Success(seqNr) => Ack(outKey, seqNr)
case Failure(_) => AskTimeout(outKey, out.seqNr)
}
def tellRequestNext(): Unit = {
if (traceEnabled)
context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr)
s.producer ! requestNext
}
val hasMoreDemand = consumersWithDemand.size >= 2
// decision table based on s.hasRequested, wasStashed, hasMoreDemand, stashBuffer.isEmpty
val newRequested =
if (s.requested && !wasStashed && hasMoreDemand) {
// request immediately since more demand
tellRequestNext()
true
} else if (s.requested && !wasStashed && !hasMoreDemand) {
// wait until more demand
false
} else if (!s.requested && wasStashed && hasMoreDemand && stashBuffer.isEmpty) {
// msg was unstashed, the last from stash
tellRequestNext()
true
} else if (!s.requested && wasStashed && hasMoreDemand && stashBuffer.nonEmpty) {
// more in stash
false
} else if (!s.requested && wasStashed && !hasMoreDemand) {
// wait until more demand
false
} else if (s.requested && wasStashed) {
// msg was unstashed, but pending request already in progress
true
} else if (durableQueue.isDefined && !s.requested && !wasStashed) {
// msg ResendDurableMsg, and stashed before storage
false
} else {
throw new IllegalStateException(s"Invalid combination of hasRequested [${s.requested}], " +
s"wasStashed [$wasStashed], hasMoreDemand [$hasMoreDemand], stashBuffer.isEmpty [${stashBuffer.isEmpty}]")
}
s.copy(out = newOut, requested = newRequested, preselectedWorkers = s.preselectedWorkers - totalSeqNr)
case Left(newState) =>
newState
}
}
def workersWithDemand: Vector[(OutKey, OutState[A])] =
s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector
def selectWorker(): Option[(OutKey, OutState[A])] = {
val preselected = s.preselectedWorkers.valuesIterator.map(_.outKey).toSet
val workers = workersWithDemand.filterNot {
case (outKey, _) => preselected(outKey)
}
if (workers.isEmpty) {
None
} else {
val i = ThreadLocalRandom.current().nextInt(workers.size)
Some(workers(i))
}
}
def onMessageBeforeDurableQueue(msg: A, replyTo: Option[ActorRef[Done]]): Behavior[InternalCommand] = {
selectWorker() match {
case Some((outKey, out)) =>
storeMessageSent(
MessageSent(
s.currentSeqNr,
msg,
ack = replyTo.isDefined,
out.confirmationQualifier,
System.currentTimeMillis()),
attempt = 1)
val newReplyAfterStore = replyTo match {
case None => s.replyAfterStore
case Some(r) => s.replyAfterStore.updated(s.currentSeqNr, r)
}
active(
s.copy(
currentSeqNr = s.currentSeqNr + 1,
preselectedWorkers =
s.preselectedWorkers.updated(s.currentSeqNr, PreselectedWorker(outKey, out.confirmationQualifier)),
replyAfterStore = newReplyAfterStore))
case None =>
checkStashFull(stashBuffer)
// no demand from any workers, or all already preselected
context.log.debug("Stash before storage, seqNr [{}]", s.currentSeqNr)
// not stored yet, so don't treat it as stashed
stashBuffer.stash(Msg(msg, wasStashed = false, replyTo))
active(s)
}
}
def onResendDurableMsg(resend: ResendDurableMsg[A]): Behavior[InternalCommand] = {
require(durableQueue.isDefined, "Unexpected ResendDurableMsg when DurableQueue not defined.")
selectWorker() match {
case Some((outKey, out)) =>
storeMessageSent(
MessageSent(s.currentSeqNr, resend.msg, false, out.confirmationQualifier, System.currentTimeMillis()),
attempt = 1)
// When StoreMessageSentCompleted (oldConfirmationQualifier, oldSeqNr) confirmation will be stored
active(
s.copy(
currentSeqNr = s.currentSeqNr + 1,
preselectedWorkers =
s.preselectedWorkers.updated(s.currentSeqNr, PreselectedWorker(outKey, out.confirmationQualifier)),
handOver =
s.handOver.updated(s.currentSeqNr, HandOver(resend.oldConfirmationQualifier, resend.oldSeqNr))))
case None =>
checkStashFull(stashBuffer)
// no demand from any workers, or all already preselected
context.log.debug("Stash before storage of resent durable message, seqNr [{}].", s.currentSeqNr)
// not stored yet, so don't treat it as stashed
stashBuffer.stash(resend)
active(s)
}
}
def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A) = {
s.replyAfterStore.get(seqNr).foreach { replyTo =>
if (traceEnabled)
context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr)
replyTo ! Done
}
val wasHandOver =
s.handOver.get(seqNr) match {
case Some(HandOver(oldConfirmationQualifier, oldSeqNr)) =>
durableQueue.foreach { d =>
d ! StoreMessageConfirmed(oldSeqNr, oldConfirmationQualifier, System.currentTimeMillis())
}
true
case None =>
false
}
val newState = onMessage(m, wasStashed = wasHandOver, replyTo = None, seqNr)
active(newState.copy(replyAfterStore = newState.replyAfterStore - seqNr, handOver = newState.handOver - seqNr))
}
def receiveAck(ack: Ack): Behavior[InternalCommand] = {
s.out.get(ack.outKey) match {
case Some(outState) =>
val newUnconfirmed = onAck(outState, ack.confirmedSeqNr)
active(s.copy(out = s.out.updated(ack.outKey, outState.copy(unconfirmed = newUnconfirmed))))
case None =>
// obsolete Next, ConsumerController already deregistered
Behaviors.unhandled
}
}
def onAck(outState: OutState[A], confirmedSeqNr: OutSeqNr): Vector[Unconfirmed[A]] = {
val (confirmed, newUnconfirmed) = outState.unconfirmed.partition {
case Unconfirmed(_, seqNr, _, _) => seqNr <= confirmedSeqNr
}
if (confirmed.nonEmpty) {
if (traceEnabled)
context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier)
confirmed.foreach {
case Unconfirmed(_, _, _, None) => // no reply
case Unconfirmed(_, _, _, Some(replyTo)) =>
replyTo ! Done
}
durableQueue.foreach { d =>
// Storing the confirmedSeqNr can be "write behind", at-least-once delivery
d ! StoreMessageConfirmed(
confirmed.last.totalSeqNr,
outState.confirmationQualifier,
System.currentTimeMillis())
}
}
newUnconfirmed
}
def receiveWorkerRequestNext(w: WorkerRequestNext[A]): Behavior[InternalCommand] = {
val next = w.next
val outKey = next.producerId
s.out.get(outKey) match {
case Some(outState) =>
val confirmedSeqNr = w.next.confirmedSeqNr
if (traceEnabled)
context.log.trace2(
"Received RequestNext from worker [{}], confirmedSeqNr [{}].",
w.next.producerId,
confirmedSeqNr)
val newUnconfirmed = onAck(outState, confirmedSeqNr)
val newOut =
s.out.updated(
outKey,
outState
.copy(seqNr = w.next.currentSeqNr, unconfirmed = newUnconfirmed, askNextTo = Some(next.askNextTo)))
if (stashBuffer.nonEmpty) {
context.log.debug2("Unstash [{}] after RequestNext from worker [{}]", stashBuffer.size, w.next.producerId)
stashBuffer.unstashAll(active(s.copy(out = newOut)))
} else if (s.requested) {
active(s.copy(out = newOut))
} else {
if (traceEnabled)
context.log
.trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId)
s.producer ! requestNext
active(s.copy(out = newOut, requested = true))
}
case None =>
// obsolete Next, ConsumerController already deregistered
Behaviors.unhandled
}
}
def receiveCurrentWorkers(curr: CurrentWorkers[A]): Behavior[InternalCommand] = {
// TODO #28722 we could also track unreachable workers and avoid them when selecting worker
val addedWorkers = curr.workers.diff(s.workers)
val removedWorkers = s.workers.diff(curr.workers)
val newState = addedWorkers.foldLeft(s) { (acc, c) =>
val uuid = UUID.randomUUID().toString
val outKey = s"$producerId-$uuid"
context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey)
val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings),
uuid,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(workerRequestNextAdapter)
p ! ProducerController.RegisterConsumer(c)
acc.copy(out = acc.out.updated(outKey, OutState(p, c, 0L, Vector.empty, None)))
}
val newState2 = removedWorkers.foldLeft(newState) { (acc, c) =>
acc.out.find { case (_, outState) => outState.consumerController == c } match {
case Some((key, outState)) =>
context.log.debug2("Deregistered worker [{}], with producerId [{}].", c, key)
context.stop(outState.producerController)
// resend the unconfirmed, sending to self since order of messages for WorkPulling doesn't matter anyway
if (outState.unconfirmed.nonEmpty)
context.log.debugN(
"Resending unconfirmed from deregistered worker with producerId [{}], from seqNr [{}] to [{}].",
key,
outState.unconfirmed.head.outSeqNr,
outState.unconfirmed.last.outSeqNr)
outState.unconfirmed.foreach {
case Unconfirmed(totalSeqNr, _, msg, replyTo) =>
if (durableQueue.isEmpty)
context.self ! Msg(msg, wasStashed = true, replyTo)
else
context.self ! ResendDurableMsg(msg, outState.confirmationQualifier, totalSeqNr)
}
acc.copy(out = acc.out - key)
case None =>
context.log.debug("Deregistered non-existing worker [{}]", c)
acc
}
}
active(newState2.copy(workers = curr.workers))
}
def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
ProducerControllerImpl.enforceLocalProducer(start.producer)
context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
if (s.requested)
start.producer ! requestNext
active(s.copy(producer = start.producer))
}
Behaviors.receiveMessage {
case Msg(msg: A, wasStashed, replyTo) =>
if (durableQueue.isEmpty || wasStashed)
active(onMessage(msg, wasStashed, replyTo, s.currentSeqNr))
else
onMessageBeforeDurableQueue(msg, replyTo)
case MessageWithConfirmation(msg: A, replyTo) =>
if (durableQueue.isEmpty)
active(onMessage(msg, wasStashed = false, Some(replyTo), s.currentSeqNr))
else
onMessageBeforeDurableQueue(msg, Some(replyTo))
case m: ResendDurableMsg[A @unchecked] =>
onResendDurableMsg(m)
case StoreMessageSentCompleted(MessageSent(seqNr, m: A, _, _, _)) =>
receiveStoreMessageSentCompleted(seqNr, m)
case f: StoreMessageSentFailed[A @unchecked] =>
receiveStoreMessageSentFailed(f)
case ack: Ack =>
receiveAck(ack)
case w: WorkerRequestNext[A @unchecked] =>
receiveWorkerRequestNext(w)
case curr: CurrentWorkers[A @unchecked] =>
receiveCurrentWorkers(curr)
case GetWorkerStats(replyTo) =>
replyTo ! WorkerStats(s.workers.size)
Behaviors.same
case RegisterConsumerDone =>
Behaviors.same
case start: Start[A @unchecked] =>
receiveStart(start)
case AskTimeout(outKey, outSeqNr) =>
context.log.debug(
"Message seqNr [{}] sent to worker [{}] timed out. It will be be redelivered.",
outSeqNr,
outKey)
Behaviors.same
case DurableQueueTerminated =>
throw new IllegalStateException("DurableQueue was unexpectedly terminated.")
case unexpected =>
throw new RuntimeException(s"Unexpected message: $unexpected")
}
}