private def active()

in actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/ProducerControllerImpl.scala [404:842]


  private def active(s: State[A]): Behavior[InternalCommand] = {

    def onMsg(
        seqMsg: SequencedMessage[A],
        newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]],
        newRemainingChunks: immutable.Seq[SequencedMessage[A]]): Behavior[InternalCommand] = {
      checkOnMsgRequestedState()
      if (seqMsg.isLastChunk != newRemainingChunks.isEmpty)
        throw new IllegalStateException(
          s"seqMsg [${seqMsg.seqNr}] was lastChunk but remaining [${newRemainingChunks.size}] chunks.")
      if (traceEnabled)
        context.log.trace("Sending [{}] with seqNr [{}].", seqMsg.message.getClass.getName, s.currentSeqNr)
      val newUnconfirmed =
        if (s.supportResend) s.unconfirmed :+ seqMsg
        else Vector.empty // no resending, no need to keep unconfirmed

      if (s.currentSeqNr == s.firstSeqNr)
        timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)

      flightRecorder.producerSent(producerId, seqMsg.seqNr)
      s.send(seqMsg)
      val newRequested =
        if (s.currentSeqNr == s.requestedSeqNr) {
          flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr)
          newRemainingChunks.nonEmpty // keep it true until lastChunk
        } else if (seqMsg.isLastChunk) {
          flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr)
          s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self)
          true
        } else {
          context.self ! SendChunk
          true // keep it true until lastChunk
        }

      active(
        s.copy(
          requested = newRequested,
          currentSeqNr = s.currentSeqNr + 1,
          replyAfterStore = newReplyAfterStore,
          unconfirmed = newUnconfirmed,
          remainingChunks = newRemainingChunks,
          storeMessageSentInProgress = 0))
    }

    def checkOnMsgRequestedState(): Unit = {
      if (!s.requested || s.currentSeqNr > s.requestedSeqNr) {
        throw new IllegalStateException(
          s"Unexpected Msg when no demand, requested ${s.requested}, " +
          s"requestedSeqNr ${s.requestedSeqNr}, currentSeqNr ${s.currentSeqNr}")
      }
    }

    def checkReceiveMessageRemainingChunksState(): Unit = {
      if (s.remainingChunks.nonEmpty)
        throw new IllegalStateException(
          s"Received unexpected message before sending remaining [${s.remainingChunks.size}] chunks.")
    }

    def receiveRequest(
        newConfirmedSeqNr: SeqNr,
        newRequestedSeqNr: SeqNr,
        supportResend: Boolean,
        viaTimeout: Boolean): Behavior[InternalCommand] = {
      flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr, newConfirmedSeqNr)
      context.log.debugN(
        "Received Request, confirmed [{}], requested [{}], current [{}]",
        newConfirmedSeqNr,
        newRequestedSeqNr,
        s.currentSeqNr)

      val stateAfterAck = onAck(newConfirmedSeqNr)

      val newUnconfirmed =
        if (supportResend) stateAfterAck.unconfirmed
        else Vector.empty

      if ((viaTimeout || newConfirmedSeqNr == s.firstSeqNr) && supportResend) {
        // the last message was lost and no more message was sent that would trigger Resend
        resendUnconfirmed(newUnconfirmed)
      }

      // when supportResend=false the requestedSeqNr window must be expanded if all sent messages were lost
      val newRequestedSeqNr2 =
        if (!supportResend && newRequestedSeqNr <= stateAfterAck.currentSeqNr)
          stateAfterAck.currentSeqNr + (newRequestedSeqNr - newConfirmedSeqNr)
        else
          newRequestedSeqNr
      if (newRequestedSeqNr2 != newRequestedSeqNr)
        context.log.debugN(
          "Expanded requestedSeqNr from [{}] to [{}], because current [{}] and all were probably lost",
          newRequestedSeqNr,
          newRequestedSeqNr2,
          stateAfterAck.currentSeqNr)

      if (newRequestedSeqNr2 > s.requestedSeqNr) {
        val newRequested =
          if (s.storeMessageSentInProgress != 0) {
            s.requested
          } else if (s.remainingChunks.nonEmpty) {
            context.self ! SendChunk
            s.requested
          } else if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) {
            flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr)
            s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self)
            true
          } else {
            s.requested
          }

        active(
          stateAfterAck.copy(
            requested = newRequested,
            requestedSeqNr = newRequestedSeqNr2,
            supportResend = supportResend,
            unconfirmed = newUnconfirmed))
      } else {
        active(stateAfterAck.copy(supportResend = supportResend, unconfirmed = newUnconfirmed))
      }
    }

    def receiveAck(newConfirmedSeqNr: SeqNr): Behavior[InternalCommand] = {
      if (traceEnabled)
        context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr)
      val stateAfterAck = onAck(newConfirmedSeqNr)
      if (newConfirmedSeqNr == s.firstSeqNr && stateAfterAck.unconfirmed.nonEmpty) {
        resendUnconfirmed(stateAfterAck.unconfirmed)
      }
      active(stateAfterAck)
    }

    def onAck(newConfirmedSeqNr: SeqNr): State[A] = {
      val (replies, newReplyAfterStore) = s.replyAfterStore.partition { case (seqNr, _) => seqNr <= newConfirmedSeqNr }
      if (replies.nonEmpty && traceEnabled)
        context.log.trace("Sending confirmation replies from [{}] to [{}].", replies.head._1, replies.last._1)
      replies.foreach {
        case (seqNr, replyTo) => replyTo ! seqNr
      }

      val newUnconfirmed =
        if (s.supportResend) s.unconfirmed.dropWhile(_.seqNr <= newConfirmedSeqNr)
        else Vector.empty

      if (newConfirmedSeqNr == s.firstSeqNr)
        timers.cancel(ResendFirst)

      val newMaxConfirmedSeqNr = math.max(s.confirmedSeqNr, newConfirmedSeqNr)

      durableQueue.foreach { d =>
        // Storing the confirmedSeqNr can be "write behind", at-least-once delivery
        // TODO #28721 to reduce number of writes, consider to only StoreMessageConfirmed for the Request messages and not for each Ack
        if (newMaxConfirmedSeqNr != s.confirmedSeqNr)
          d ! StoreMessageConfirmed(newMaxConfirmedSeqNr, NoQualifier, System.currentTimeMillis())
      }

      s.copy(confirmedSeqNr = newMaxConfirmedSeqNr, replyAfterStore = newReplyAfterStore, unconfirmed = newUnconfirmed)
    }

    def receiveStoreMessageSentCompleted(seqNr: SeqNr): Behavior[InternalCommand] = {
      if (seqNr == s.storeMessageSentInProgress) {
        if (seqNr != s.currentSeqNr)
          throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]")
        val seqMsg = s.remainingChunks.head
        if (seqNr != seqMsg.seqNr)
          throw new IllegalStateException(s"seqNr [${seqMsg.seqNr}] not matching stored seqNr [$seqNr]")

        s.replyAfterStore.get(seqNr).foreach { replyTo =>
          if (traceEnabled)
            context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
          replyTo ! seqNr
        }
        val newReplyAfterStore = s.replyAfterStore - seqNr

        onMsg(seqMsg, newReplyAfterStore, s.remainingChunks.tail)
      } else {
        context.log.debug(
          "Received StoreMessageSentCompleted for seqNr [{}] but waiting for [{}]. " +
          "Probably due to retry.",
          seqNr,
          s.storeMessageSentInProgress)
        Behaviors.same
      }
    }

    def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
      if (f.messageSent.seqNr == s.storeMessageSentInProgress) {
        if (f.attempt >= settings.durableQueueRetryAttempts) {
          val errorMessage =
            s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
          context.log.error(errorMessage)
          throw new TimeoutException(errorMessage)
        } else {
          context.log.warnN(
            "StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.",
            f.messageSent.seqNr,
            f.attempt,
            settings.durableQueueRetryAttempts)
          // retry
          if (f.messageSent.isFirstChunk) {
            storeMessageSent(f.messageSent, attempt = f.attempt + 1)
            Behaviors.same
          } else {
            // store all chunks again, because partially stored chunks are discarded by the DurableQueue
            // when it's restarted
            val unconfirmedReverse = s.unconfirmed.reverse
            val xs = unconfirmedReverse.takeWhile(!_.isFirstChunk)
            if (unconfirmedReverse.size == xs.size)
              throw new IllegalStateException(s"First chunk not found in unconfirmed: ${s.unconfirmed}")
            val firstChunk = unconfirmedReverse.drop(xs.size).head
            val newRemainingChunks = (firstChunk +: xs.reverse) ++ s.remainingChunks
            val newUnconfirmed = s.unconfirmed.dropRight(xs.size + 1)

            context.log.debug(
              "Store all [{}] chunks again, starting at seqNr [{}].",
              newRemainingChunks.size,
              firstChunk.seqNr)

            if (!newRemainingChunks.head.isFirstChunk || !newRemainingChunks.last.isLastChunk)
              throw new IllegalStateException(s"Wrong remainingChunks: $newRemainingChunks")

            storeMessageSent(
              MessageSent.fromMessageOrChunked(
                firstChunk.seqNr,
                firstChunk.message,
                firstChunk.ack,
                NoQualifier,
                System.currentTimeMillis()),
              attempt = f.attempt + 1)
            active(
              s.copy(
                storeMessageSentInProgress = firstChunk.seqNr,
                remainingChunks = newRemainingChunks,
                unconfirmed = newUnconfirmed,
                currentSeqNr = firstChunk.seqNr))
          }
        }
      } else {
        Behaviors.same
      }
    }

    def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = {
      flightRecorder.producerReceivedResend(producerId, fromSeqNr)
      resendUnconfirmed(s.unconfirmed.dropWhile(_.seqNr < fromSeqNr))
      if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) {
        val newUnconfirmed = s.unconfirmed.head.asFirst +: s.unconfirmed.tail
        active(s.copy(unconfirmed = newUnconfirmed))
      } else
        Behaviors.same
    }

    def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = {
      if (newUnconfirmed.nonEmpty) {
        val fromSeqNr = newUnconfirmed.head.seqNr
        val toSeqNr = newUnconfirmed.last.seqNr
        flightRecorder.producerResentUnconfirmed(producerId, fromSeqNr, toSeqNr)
        context.log.debug("Resending [{} - {}].", fromSeqNr, toSeqNr)
        newUnconfirmed.foreach(s.send)
      }
    }

    def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = {
      if (s.unconfirmed.nonEmpty) {
        flightRecorder.producerResentFirstUnconfirmed(producerId, s.unconfirmed.head.seqNr)
        context.log.debug("Resending first unconfirmed [{}].", s.unconfirmed.head.seqNr)
        s.send(s.unconfirmed.head)
      }
      Behaviors.same
    }

    def receiveResendFirst(): Behavior[InternalCommand] = {
      if (s.unconfirmed.nonEmpty && s.unconfirmed.head.seqNr == s.firstSeqNr) {
        flightRecorder.producerResentFirst(producerId, s.firstSeqNr)
        context.log.debug("Resending first, [{}].", s.firstSeqNr)
        s.send(s.unconfirmed.head.asFirst)
      } else {
        if (s.currentSeqNr > s.firstSeqNr)
          timers.cancel(ResendFirst)
      }
      Behaviors.same
    }

    def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
      ProducerControllerImpl.enforceLocalProducer(start.producer)
      context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
      if (s.requested && s.remainingChunks.isEmpty) {
        flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr)
        start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self)
      }
      active(s.copy(producer = start.producer))
    }

    def receiveRegisterConsumer(
        consumerController: ActorRef[ConsumerController.Command[A]]): Behavior[InternalCommand] = {
      val newFirstSeqNr =
        if (s.unconfirmed.isEmpty) s.currentSeqNr
        else s.unconfirmed.head.seqNr
      context.log.debug(
        "Register new ConsumerController [{}], starting with seqNr [{}].",
        consumerController,
        newFirstSeqNr)
      if (s.unconfirmed.nonEmpty) {
        timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)
        context.self ! ResendFirst
      }
      // update the send function
      val newSend = consumerController.tell(_)
      active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend))
    }

    def receiveSendChunk(): Behavior[InternalCommand] = {
      if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) {
        if (traceEnabled)
          context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr)
        if (durableQueue.isEmpty) {
          onMsg(s.remainingChunks.head, s.replyAfterStore, s.remainingChunks.tail)
        } else {
          val seqMsg = s.remainingChunks.head
          storeMessageSent(
            MessageSent
              .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
            attempt = 1)
          active(s.copy(storeMessageSentInProgress = seqMsg.seqNr)) // still same s.remainingChunks
        }
      } else {
        Behaviors.same
      }
    }

    def chunk(m: A, ack: Boolean): immutable.Seq[SequencedMessage[A]] = {
      val chunkSize = settings.chunkLargeMessagesBytes
      if (chunkSize > 0) {
        val chunkedMessages = createChunks(m, chunkSize, serialization)

        if (traceEnabled) {
          if (chunkedMessages.size == 1)
            context.log.trace(
              "No chunking of seqNr [{}], size [{} bytes].",
              s.currentSeqNr,
              chunkedMessages.head.serialized.size)
          else
            context.log.traceN(
              "Chunked seqNr [{}] into [{}] pieces, total size [{} bytes].",
              s.currentSeqNr,
              chunkedMessages.size,
              chunkedMessages.map(_.serialized.size).sum)
        }

        var i = 0
        chunkedMessages.map { chunkedMessage =>
          val seqNr = s.currentSeqNr + i
          i += 1
          SequencedMessage.fromChunked[A](
            producerId,
            seqNr,
            chunkedMessage,
            seqNr == s.firstSeqNr,
            ack && chunkedMessage.lastChunk, // only the last need ack = true
            context.self)
        }
      } else {
        val seqMsg =
          SequencedMessage[A](producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self)
        seqMsg :: Nil
      }
    }

    Behaviors.receiveMessage {
      case MessageWithConfirmation(m: A, replyTo) =>
        checkReceiveMessageRemainingChunksState()
        flightRecorder.producerReceived(producerId, s.currentSeqNr)
        val chunks = chunk(m, ack = true)
        val newReplyAfterStore = s.replyAfterStore.updated(chunks.last.seqNr, replyTo)
        if (durableQueue.isEmpty) {
          onMsg(chunks.head, newReplyAfterStore, chunks.tail)
        } else {
          val seqMsg = chunks.head
          storeMessageSent(
            MessageSent
              .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
            attempt = 1)
          active(
            s.copy(
              replyAfterStore = newReplyAfterStore,
              remainingChunks = chunks,
              storeMessageSentInProgress = seqMsg.seqNr))
        }

      case Msg(m: A) =>
        checkReceiveMessageRemainingChunksState()
        flightRecorder.producerReceived(producerId, s.currentSeqNr)
        val chunks = chunk(m, ack = false)
        if (durableQueue.isEmpty) {
          onMsg(chunks.head, s.replyAfterStore, chunks.tail)
        } else {
          val seqMsg = chunks.head
          storeMessageSent(
            MessageSent
              .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
            attempt = 1)
          active(s.copy(remainingChunks = chunks, storeMessageSentInProgress = seqMsg.seqNr))
        }

      case StoreMessageSentCompleted(sent: MessageSent[_]) =>
        receiveStoreMessageSentCompleted(sent.seqNr)

      case f: StoreMessageSentFailed[A @unchecked] =>
        receiveStoreMessageSentFailed(f)

      case Request(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaTimeout) =>
        receiveRequest(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaTimeout)

      case Ack(newConfirmedSeqNr) =>
        receiveAck(newConfirmedSeqNr)

      case SendChunk =>
        receiveSendChunk()

      case Resend(fromSeqNr) =>
        receiveResend(fromSeqNr)

      case ResendFirst =>
        receiveResendFirst()

      case ResendFirstUnconfirmed =>
        receiveResendFirstUnconfirmed()

      case start: Start[A @unchecked] =>
        receiveStart(start)

      case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked) =>
        receiveRegisterConsumer(consumerController)

      case DurableQueueTerminated =>
        throw new IllegalStateException("DurableQueue was unexpectedly terminated.")

      case unexpected =>
        throw new RuntimeException(s"Unexpected message: $unexpected")
    }
  }