in actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala [404:842]
private def active(s: State[A]): Behavior[InternalCommand] = {
def onMsg(
seqMsg: SequencedMessage[A],
newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]],
newRemainingChunks: immutable.Seq[SequencedMessage[A]]): Behavior[InternalCommand] = {
checkOnMsgRequestedState()
if (seqMsg.isLastChunk != newRemainingChunks.isEmpty)
throw new IllegalStateException(
s"seqMsg [${seqMsg.seqNr}] was lastChunk but remaining [${newRemainingChunks.size}] chunks.")
if (traceEnabled)
context.log.trace("Sending [{}] with seqNr [{}].", seqMsg.message.getClass.getName, s.currentSeqNr)
val newUnconfirmed =
if (s.supportResend) s.unconfirmed :+ seqMsg
else Vector.empty // no resending, no need to keep unconfirmed
if (s.currentSeqNr == s.firstSeqNr)
timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)
flightRecorder.producerSent(producerId, seqMsg.seqNr)
s.send(seqMsg)
val newRequested =
if (s.currentSeqNr == s.requestedSeqNr) {
flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr)
newRemainingChunks.nonEmpty // keep it true until lastChunk
} else if (seqMsg.isLastChunk) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self)
true
} else {
context.self ! SendChunk
true // keep it true until lastChunk
}
active(
s.copy(
requested = newRequested,
currentSeqNr = s.currentSeqNr + 1,
replyAfterStore = newReplyAfterStore,
unconfirmed = newUnconfirmed,
remainingChunks = newRemainingChunks,
storeMessageSentInProgress = 0))
}
def checkOnMsgRequestedState(): Unit = {
if (!s.requested || s.currentSeqNr > s.requestedSeqNr) {
throw new IllegalStateException(
s"Unexpected Msg when no demand, requested ${s.requested}, " +
s"requestedSeqNr ${s.requestedSeqNr}, currentSeqNr ${s.currentSeqNr}")
}
}
def checkReceiveMessageRemainingChunksState(): Unit = {
if (s.remainingChunks.nonEmpty)
throw new IllegalStateException(
s"Received unexpected message before sending remaining [${s.remainingChunks.size}] chunks.")
}
def receiveRequest(
newConfirmedSeqNr: SeqNr,
newRequestedSeqNr: SeqNr,
supportResend: Boolean,
viaTimeout: Boolean): Behavior[InternalCommand] = {
flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr, newConfirmedSeqNr)
context.log.debugN(
"Received Request, confirmed [{}], requested [{}], current [{}]",
newConfirmedSeqNr,
newRequestedSeqNr,
s.currentSeqNr)
val stateAfterAck = onAck(newConfirmedSeqNr)
val newUnconfirmed =
if (supportResend) stateAfterAck.unconfirmed
else Vector.empty
if ((viaTimeout || newConfirmedSeqNr == s.firstSeqNr) && supportResend) {
// the last message was lost and no more message was sent that would trigger Resend
resendUnconfirmed(newUnconfirmed)
}
// when supportResend=false the requestedSeqNr window must be expanded if all sent messages were lost
val newRequestedSeqNr2 =
if (!supportResend && newRequestedSeqNr <= stateAfterAck.currentSeqNr)
stateAfterAck.currentSeqNr + (newRequestedSeqNr - newConfirmedSeqNr)
else
newRequestedSeqNr
if (newRequestedSeqNr2 != newRequestedSeqNr)
context.log.debugN(
"Expanded requestedSeqNr from [{}] to [{}], because current [{}] and all were probably lost",
newRequestedSeqNr,
newRequestedSeqNr2,
stateAfterAck.currentSeqNr)
if (newRequestedSeqNr2 > s.requestedSeqNr) {
val newRequested =
if (s.storeMessageSentInProgress != 0) {
s.requested
} else if (s.remainingChunks.nonEmpty) {
context.self ! SendChunk
s.requested
} else if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self)
true
} else {
s.requested
}
active(
stateAfterAck.copy(
requested = newRequested,
requestedSeqNr = newRequestedSeqNr2,
supportResend = supportResend,
unconfirmed = newUnconfirmed))
} else {
active(stateAfterAck.copy(supportResend = supportResend, unconfirmed = newUnconfirmed))
}
}
def receiveAck(newConfirmedSeqNr: SeqNr): Behavior[InternalCommand] = {
if (traceEnabled)
context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr)
val stateAfterAck = onAck(newConfirmedSeqNr)
if (newConfirmedSeqNr == s.firstSeqNr && stateAfterAck.unconfirmed.nonEmpty) {
resendUnconfirmed(stateAfterAck.unconfirmed)
}
active(stateAfterAck)
}
def onAck(newConfirmedSeqNr: SeqNr): State[A] = {
val (replies, newReplyAfterStore) = s.replyAfterStore.partition { case (seqNr, _) => seqNr <= newConfirmedSeqNr }
if (replies.nonEmpty && traceEnabled)
context.log.trace("Sending confirmation replies from [{}] to [{}].", replies.head._1, replies.last._1)
replies.foreach {
case (seqNr, replyTo) => replyTo ! seqNr
}
val newUnconfirmed =
if (s.supportResend) s.unconfirmed.dropWhile(_.seqNr <= newConfirmedSeqNr)
else Vector.empty
if (newConfirmedSeqNr == s.firstSeqNr)
timers.cancel(ResendFirst)
val newMaxConfirmedSeqNr = math.max(s.confirmedSeqNr, newConfirmedSeqNr)
durableQueue.foreach { d =>
// Storing the confirmedSeqNr can be "write behind", at-least-once delivery
// TODO #28721 to reduce number of writes, consider to only StoreMessageConfirmed for the Request messages and not for each Ack
if (newMaxConfirmedSeqNr != s.confirmedSeqNr)
d ! StoreMessageConfirmed(newMaxConfirmedSeqNr, NoQualifier, System.currentTimeMillis())
}
s.copy(confirmedSeqNr = newMaxConfirmedSeqNr, replyAfterStore = newReplyAfterStore, unconfirmed = newUnconfirmed)
}
def receiveStoreMessageSentCompleted(seqNr: SeqNr): Behavior[InternalCommand] = {
if (seqNr == s.storeMessageSentInProgress) {
if (seqNr != s.currentSeqNr)
throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]")
val seqMsg = s.remainingChunks.head
if (seqNr != seqMsg.seqNr)
throw new IllegalStateException(s"seqNr [${seqMsg.seqNr}] not matching stored seqNr [$seqNr]")
s.replyAfterStore.get(seqNr).foreach { replyTo =>
if (traceEnabled)
context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
replyTo ! seqNr
}
val newReplyAfterStore = s.replyAfterStore - seqNr
onMsg(seqMsg, newReplyAfterStore, s.remainingChunks.tail)
} else {
context.log.debug(
"Received StoreMessageSentCompleted for seqNr [{}] but waiting for [{}]. " +
"Probably due to retry.",
seqNr,
s.storeMessageSentInProgress)
Behaviors.same
}
}
def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.messageSent.seqNr == s.storeMessageSentInProgress) {
if (f.attempt >= settings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)
throw new TimeoutException(errorMessage)
} else {
context.log.warnN(
"StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.",
f.messageSent.seqNr,
f.attempt,
settings.durableQueueRetryAttempts)
// retry
if (f.messageSent.isFirstChunk) {
storeMessageSent(f.messageSent, attempt = f.attempt + 1)
Behaviors.same
} else {
// store all chunks again, because partially stored chunks are discarded by the DurableQueue
// when it's restarted
val unconfirmedReverse = s.unconfirmed.reverse
val xs = unconfirmedReverse.takeWhile(!_.isFirstChunk)
if (unconfirmedReverse.size == xs.size)
throw new IllegalStateException(s"First chunk not found in unconfirmed: ${s.unconfirmed}")
val firstChunk = unconfirmedReverse.drop(xs.size).head
val newRemainingChunks = (firstChunk +: xs.reverse) ++ s.remainingChunks
val newUnconfirmed = s.unconfirmed.dropRight(xs.size + 1)
context.log.debug(
"Store all [{}] chunks again, starting at seqNr [{}].",
newRemainingChunks.size,
firstChunk.seqNr)
if (!newRemainingChunks.head.isFirstChunk || !newRemainingChunks.last.isLastChunk)
throw new IllegalStateException(s"Wrong remainingChunks: $newRemainingChunks")
storeMessageSent(
MessageSent.fromMessageOrChunked(
firstChunk.seqNr,
firstChunk.message,
firstChunk.ack,
NoQualifier,
System.currentTimeMillis()),
attempt = f.attempt + 1)
active(
s.copy(
storeMessageSentInProgress = firstChunk.seqNr,
remainingChunks = newRemainingChunks,
unconfirmed = newUnconfirmed,
currentSeqNr = firstChunk.seqNr))
}
}
} else {
Behaviors.same
}
}
def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = {
flightRecorder.producerReceivedResend(producerId, fromSeqNr)
resendUnconfirmed(s.unconfirmed.dropWhile(_.seqNr < fromSeqNr))
if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) {
val newUnconfirmed = s.unconfirmed.head.asFirst +: s.unconfirmed.tail
active(s.copy(unconfirmed = newUnconfirmed))
} else
Behaviors.same
}
def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = {
if (newUnconfirmed.nonEmpty) {
val fromSeqNr = newUnconfirmed.head.seqNr
val toSeqNr = newUnconfirmed.last.seqNr
flightRecorder.producerResentUnconfirmed(producerId, fromSeqNr, toSeqNr)
context.log.debug("Resending [{} - {}].", fromSeqNr, toSeqNr)
newUnconfirmed.foreach(s.send)
}
}
def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = {
if (s.unconfirmed.nonEmpty) {
flightRecorder.producerResentFirstUnconfirmed(producerId, s.unconfirmed.head.seqNr)
context.log.debug("Resending first unconfirmed [{}].", s.unconfirmed.head.seqNr)
s.send(s.unconfirmed.head)
}
Behaviors.same
}
def receiveResendFirst(): Behavior[InternalCommand] = {
if (s.unconfirmed.nonEmpty && s.unconfirmed.head.seqNr == s.firstSeqNr) {
flightRecorder.producerResentFirst(producerId, s.firstSeqNr)
context.log.debug("Resending first, [{}].", s.firstSeqNr)
s.send(s.unconfirmed.head.asFirst)
} else {
if (s.currentSeqNr > s.firstSeqNr)
timers.cancel(ResendFirst)
}
Behaviors.same
}
def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
ProducerControllerImpl.enforceLocalProducer(start.producer)
context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
if (s.requested && s.remainingChunks.isEmpty) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr)
start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self)
}
active(s.copy(producer = start.producer))
}
def receiveRegisterConsumer(
consumerController: ActorRef[ConsumerController.Command[A]]): Behavior[InternalCommand] = {
val newFirstSeqNr =
if (s.unconfirmed.isEmpty) s.currentSeqNr
else s.unconfirmed.head.seqNr
context.log.debug(
"Register new ConsumerController [{}], starting with seqNr [{}].",
consumerController,
newFirstSeqNr)
if (s.unconfirmed.nonEmpty) {
timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)
context.self ! ResendFirst
}
// update the send function
val newSend = consumerController.tell(_)
active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend))
}
def receiveSendChunk(): Behavior[InternalCommand] = {
if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) {
if (traceEnabled)
context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr)
if (durableQueue.isEmpty) {
onMsg(s.remainingChunks.head, s.replyAfterStore, s.remainingChunks.tail)
} else {
val seqMsg = s.remainingChunks.head
storeMessageSent(
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
active(s.copy(storeMessageSentInProgress = seqMsg.seqNr)) // still same s.remainingChunks
}
} else {
Behaviors.same
}
}
def chunk(m: A, ack: Boolean): immutable.Seq[SequencedMessage[A]] = {
val chunkSize = settings.chunkLargeMessagesBytes
if (chunkSize > 0) {
val chunkedMessages = createChunks(m, chunkSize, serialization)
if (traceEnabled) {
if (chunkedMessages.size == 1)
context.log.trace(
"No chunking of seqNr [{}], size [{} bytes].",
s.currentSeqNr,
chunkedMessages.head.serialized.size)
else
context.log.traceN(
"Chunked seqNr [{}] into [{}] pieces, total size [{} bytes].",
s.currentSeqNr,
chunkedMessages.size,
chunkedMessages.map(_.serialized.size).sum)
}
var i = 0
chunkedMessages.map { chunkedMessage =>
val seqNr = s.currentSeqNr + i
i += 1
SequencedMessage.fromChunked[A](
producerId,
seqNr,
chunkedMessage,
seqNr == s.firstSeqNr,
ack && chunkedMessage.lastChunk, // only the last need ack = true
context.self)
}
} else {
val seqMsg =
SequencedMessage[A](producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self)
seqMsg :: Nil
}
}
Behaviors.receiveMessage {
case MessageWithConfirmation(m: A, replyTo) =>
checkReceiveMessageRemainingChunksState()
flightRecorder.producerReceived(producerId, s.currentSeqNr)
val chunks = chunk(m, ack = true)
val newReplyAfterStore = s.replyAfterStore.updated(chunks.last.seqNr, replyTo)
if (durableQueue.isEmpty) {
onMsg(chunks.head, newReplyAfterStore, chunks.tail)
} else {
val seqMsg = chunks.head
storeMessageSent(
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
active(
s.copy(
replyAfterStore = newReplyAfterStore,
remainingChunks = chunks,
storeMessageSentInProgress = seqMsg.seqNr))
}
case Msg(m: A) =>
checkReceiveMessageRemainingChunksState()
flightRecorder.producerReceived(producerId, s.currentSeqNr)
val chunks = chunk(m, ack = false)
if (durableQueue.isEmpty) {
onMsg(chunks.head, s.replyAfterStore, chunks.tail)
} else {
val seqMsg = chunks.head
storeMessageSent(
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
active(s.copy(remainingChunks = chunks, storeMessageSentInProgress = seqMsg.seqNr))
}
case StoreMessageSentCompleted(sent: MessageSent[_]) =>
receiveStoreMessageSentCompleted(sent.seqNr)
case f: StoreMessageSentFailed[A @unchecked] =>
receiveStoreMessageSentFailed(f)
case Request(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaTimeout) =>
receiveRequest(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaTimeout)
case Ack(newConfirmedSeqNr) =>
receiveAck(newConfirmedSeqNr)
case SendChunk =>
receiveSendChunk()
case Resend(fromSeqNr) =>
receiveResend(fromSeqNr)
case ResendFirst =>
receiveResendFirst()
case ResendFirstUnconfirmed =>
receiveResendFirstUnconfirmed()
case start: Start[A @unchecked] =>
receiveStart(start)
case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked) =>
receiveRegisterConsumer(consumerController)
case DurableQueueTerminated =>
throw new IllegalStateException("DurableQueue was unexpectedly terminated.")
case unexpected =>
throw new RuntimeException(s"Unexpected message: $unexpected")
}
}