in cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala [300:582]
private def active(s: State[A]): Behavior[InternalCommand] = {
def onMessage(
entityId: EntityId,
msg: A,
replyTo: Option[ActorRef[Done]],
totalSeqNr: TotalSeqNr,
newReplyAfterStore: Map[TotalSeqNr, ActorRef[Done]]): Behavior[InternalCommand] = {
val outKey = s"$producerId-$entityId"
val newState =
s.out.get(outKey) match {
case Some(out @ OutState(_, _, Some(nextTo), _, _, _, _)) =>
// there is demand, send immediately
send(msg, outKey, out.seqNr, nextTo)
val newUnconfirmed = out.unconfirmed :+ Unconfirmed(totalSeqNr, out.seqNr, replyTo)
s.copy(
out = s.out.updated(
outKey,
out.copy(
seqNr = out.seqNr + 1,
nextTo = None,
unconfirmed = newUnconfirmed,
usedNanoTime = System.nanoTime())),
replyAfterStore = newReplyAfterStore)
case Some(out @ OutState(_, _, None, buffered, _, _, _)) =>
// no demand, buffer
if (s.bufferSize >= settings.bufferSize)
throw new IllegalArgumentException(s"Buffer is full, size [${settings.bufferSize}].")
context.log.debug(
"Buffering message to entityId [{}], buffer size for entity [{}]",
entityId,
buffered.size + 1)
val newBuffered = buffered :+ Buffered(totalSeqNr, msg, replyTo)
val newS =
s.copy(
out = s.out.updated(outKey, out.copy(buffered = newBuffered)),
replyAfterStore = newReplyAfterStore)
// send an updated RequestNext to indicate buffer usage
s.producer ! createRequestNext(newS)
newS
case None =>
context.log.debug("Creating ProducerController for entity [{}]", entityId)
val send: ConsumerController.SequencedMessage[A] => Unit = { seqMsg =>
region ! ShardingEnvelope(entityId, seqMsg)
}
val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings, send),
entityId,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(requestNextAdapter)
s.copy(
out = s.out.updated(
outKey,
OutState(
entityId,
p,
None,
Vector(Buffered(totalSeqNr, msg, replyTo)),
1L,
Vector.empty,
System.nanoTime())),
replyAfterStore = newReplyAfterStore)
}
active(newState)
}
def onAck(outState: OutState[A], confirmedSeqNr: OutSeqNr): Vector[Unconfirmed[A]] = {
val (confirmed, newUnconfirmed) = outState.unconfirmed.partition {
case Unconfirmed(_, seqNr, _) => seqNr <= confirmedSeqNr
}
if (confirmed.nonEmpty) {
confirmed.foreach {
case Unconfirmed(_, _, None) => // no reply
case Unconfirmed(_, _, Some(replyTo)) =>
replyTo ! Done
}
durableQueue.foreach { d =>
// Storing the confirmedSeqNr can be "write behind", at-least-once delivery
d ! StoreMessageConfirmed(confirmed.last.totalSeqNr, outState.entityId, System.currentTimeMillis())
}
}
newUnconfirmed
}
def receiveStoreMessageSentCompleted(
seqNr: SeqNr,
msg: A,
entityId: ConfirmationQualifier): Behavior[InternalCommand] = {
s.replyAfterStore.get(seqNr).foreach { replyTo =>
context.log.info("Confirmation reply to [{}] after storage", seqNr)
replyTo ! Done
}
val newReplyAfterStore = s.replyAfterStore - seqNr
onMessage(entityId, msg, replyTo = None, seqNr, newReplyAfterStore)
}
def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)
throw new TimeoutException(errorMessage)
} else {
context.log.info(s"StoreMessageSent seqNr [{}] failed, attempt [{}], retrying.", f.messageSent.seqNr, f.attempt)
// retry
storeMessageSent(f.messageSent, attempt = f.attempt + 1)
Behaviors.same
}
}
def receiveAck(ack: Ack): Behavior[InternalCommand] = {
s.out.get(ack.outKey) match {
case Some(outState) =>
if (traceEnabled)
context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr)
val newUnconfirmed = onAck(outState, ack.confirmedSeqNr)
val newUsedNanoTime =
if (newUnconfirmed.size != outState.unconfirmed.size) System.nanoTime() else outState.usedNanoTime
active(
s.copy(out =
s.out.updated(ack.outKey, outState.copy(unconfirmed = newUnconfirmed, usedNanoTime = newUsedNanoTime))))
case None =>
// obsolete Ack, ConsumerController already deregistered
Behaviors.unhandled
}
}
def receiveWrappedRequestNext(w: WrappedRequestNext[A]): Behavior[InternalCommand] = {
val next = w.next
val outKey = next.producerId
s.out.get(outKey) match {
case Some(out) =>
if (out.nextTo.nonEmpty)
throw new IllegalStateException(s"Received RequestNext but already has demand for [$outKey]")
val confirmedSeqNr = w.next.confirmedSeqNr
if (traceEnabled)
context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr)
val newUnconfirmed = onAck(out, confirmedSeqNr)
if (out.buffered.nonEmpty) {
val buf = out.buffered.head
send(buf.msg, outKey, out.seqNr, next)
val newUnconfirmed2 = newUnconfirmed :+ Unconfirmed(buf.totalSeqNr, out.seqNr, buf.replyTo)
val newProducers = s.out.updated(
outKey,
out.copy(
seqNr = out.seqNr + 1,
nextTo = None,
unconfirmed = newUnconfirmed2,
buffered = out.buffered.tail,
usedNanoTime = System.nanoTime()))
active(s.copy(out = newProducers))
} else {
val newProducers =
s.out.updated(
outKey,
out.copy(nextTo = Some(next), unconfirmed = newUnconfirmed, usedNanoTime = System.nanoTime()))
val newState = s.copy(out = newProducers)
// send an updated RequestNext
s.producer ! createRequestNext(newState)
active(newState)
}
case None =>
// if ProducerController was stopped and there was a RequestNext in flight, but will not happen in practise
context.log.warn("Received RequestNext for unknown [{}]", outKey)
Behaviors.same
}
}
def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
ProducerControllerImpl.enforceLocalProducer(start.producer)
context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
start.producer ! createRequestNext(s)
active(s.copy(producer = start.producer))
}
def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = {
val now = System.nanoTime()
s.out.foreach {
case (outKey: OutKey, outState) =>
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
if (outState.unconfirmed.nonEmpty && idleDurationMillis >= settings.resendFirstUnconfirmedIdleTimeout.toMillis) {
context.log.debug(
"Resend first unconfirmed for [{}], because it was idle for [{} ms]",
outKey,
idleDurationMillis)
outState.producerController
.unsafeUpcast[ProducerControllerImpl.InternalCommand] ! ProducerControllerImpl.ResendFirstUnconfirmed
}
}
Behaviors.same
}
def receiveCleanupUnused(): Behavior[InternalCommand] = {
val now = System.nanoTime()
val removeOutKeys =
s.out.flatMap {
case (outKey: OutKey, outState) =>
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings
.cleanupUnusedAfter.toMillis) {
context.log.debug("Cleanup unused [{}], because it was idle for [{} ms]", outKey, idleDurationMillis)
context.stop(outState.producerController)
Some(outKey)
} else
None
}
if (removeOutKeys.isEmpty)
Behaviors.same
else
active(s.copy(out = s.out -- removeOutKeys))
}
Behaviors.receiveMessage {
case msg: Msg[A @unchecked] =>
if (durableQueue.isEmpty) {
// currentSeqNr is only updated when durableQueue is enabled
onMessage(msg.envelope.entityId, msg.envelope.message, None, s.currentSeqNr, s.replyAfterStore)
} else if (msg.isAlreadyStored) {
// loaded from durable queue, currentSeqNr has already b
onMessage(msg.envelope.entityId, msg.envelope.message, None, msg.alreadyStored, s.replyAfterStore)
} else {
storeMessageSent(
MessageSent(s.currentSeqNr, msg.envelope.message, false, msg.envelope.entityId, System.currentTimeMillis()),
attempt = 1)
active(s.copy(currentSeqNr = s.currentSeqNr + 1))
}
case MessageWithConfirmation(entityId, message: A, replyTo) =>
if (durableQueue.isEmpty) {
onMessage(entityId, message, Some(replyTo), s.currentSeqNr, s.replyAfterStore)
} else {
storeMessageSent(
MessageSent(s.currentSeqNr, message, ack = true, entityId, System.currentTimeMillis()),
attempt = 1)
val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo)
active(s.copy(currentSeqNr = s.currentSeqNr + 1, replyAfterStore = newReplyAfterStore))
}
case StoreMessageSentCompleted(MessageSent(seqNr, msg: A, _, entityId, _)) =>
receiveStoreMessageSentCompleted(seqNr, msg, entityId)
case f: StoreMessageSentFailed[A @unchecked] =>
receiveStoreMessageSentFailed(f)
case ack: Ack =>
receiveAck(ack)
case w: WrappedRequestNext[A @unchecked] =>
receiveWrappedRequestNext(w)
case ResendFirstUnconfirmed =>
receiveResendFirstUnconfirmed()
case CleanupUnused =>
receiveCleanupUnused()
case start: Start[A @unchecked] =>
receiveStart(start)
case AskTimeout(outKey, outSeqNr) =>
context.log.debug(
"Message seqNr [{}] sent to entity [{}] timed out. It will be be redelivered.",
outSeqNr,
outKey)
Behaviors.same
case DurableQueueTerminated =>
throw new IllegalStateException("DurableQueue was unexpectedly terminated.")
case unexpected =>
throw new RuntimeException(s"Unexpected message: $unexpected")
}
}