private def active()

in cluster-sharding-typed/src/main/scala/org/apache/pekko/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala [300:582]


  private def active(s: State[A]): Behavior[InternalCommand] = {

    def onMessage(
        entityId: EntityId,
        msg: A,
        replyTo: Option[ActorRef[Done]],
        totalSeqNr: TotalSeqNr,
        newReplyAfterStore: Map[TotalSeqNr, ActorRef[Done]]): Behavior[InternalCommand] = {

      val outKey = s"$producerId-$entityId"
      val newState =
        s.out.get(outKey) match {
          case Some(out @ OutState(_, _, Some(nextTo), _, _, _, _)) =>
            // there is demand, send immediately
            send(msg, outKey, out.seqNr, nextTo)
            val newUnconfirmed = out.unconfirmed :+ Unconfirmed(totalSeqNr, out.seqNr, replyTo)
            s.copy(
              out = s.out.updated(
                outKey,
                out.copy(
                  seqNr = out.seqNr + 1,
                  nextTo = None,
                  unconfirmed = newUnconfirmed,
                  usedNanoTime = System.nanoTime())),
              replyAfterStore = newReplyAfterStore)
          case Some(out @ OutState(_, _, None, buffered, _, _, _)) =>
            // no demand, buffer
            if (s.bufferSize >= settings.bufferSize)
              throw new IllegalArgumentException(s"Buffer is full, size [${settings.bufferSize}].")
            context.log.debug(
              "Buffering message to entityId [{}], buffer size for entity [{}]",
              entityId,
              buffered.size + 1)
            val newBuffered = buffered :+ Buffered(totalSeqNr, msg, replyTo)
            val newS =
              s.copy(
                out = s.out.updated(outKey, out.copy(buffered = newBuffered)),
                replyAfterStore = newReplyAfterStore)
            // send an updated RequestNext to indicate buffer usage
            s.producer ! createRequestNext(newS)
            newS
          case None =>
            context.log.debug("Creating ProducerController for entity [{}]", entityId)
            val send: ConsumerController.SequencedMessage[A] => Unit = { seqMsg =>
              region ! ShardingEnvelope(entityId, seqMsg)
            }
            val p = context.spawn(
              ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings, send),
              entityId,
              DispatcherSelector.sameAsParent())
            p ! ProducerController.Start(requestNextAdapter)
            s.copy(
              out = s.out.updated(
                outKey,
                OutState(
                  entityId,
                  p,
                  None,
                  Vector(Buffered(totalSeqNr, msg, replyTo)),
                  1L,
                  Vector.empty,
                  System.nanoTime())),
              replyAfterStore = newReplyAfterStore)
        }

      active(newState)
    }

    def onAck(outState: OutState[A], confirmedSeqNr: OutSeqNr): Vector[Unconfirmed[A]] = {
      val (confirmed, newUnconfirmed) = outState.unconfirmed.partition {
        case Unconfirmed(_, seqNr, _) => seqNr <= confirmedSeqNr
      }

      if (confirmed.nonEmpty) {
        confirmed.foreach {
          case Unconfirmed(_, _, None) => // no reply
          case Unconfirmed(_, _, Some(replyTo)) =>
            replyTo ! Done
        }

        durableQueue.foreach { d =>
          // Storing the confirmedSeqNr can be "write behind", at-least-once delivery
          d ! StoreMessageConfirmed(confirmed.last.totalSeqNr, outState.entityId, System.currentTimeMillis())
        }
      }

      newUnconfirmed
    }

    def receiveStoreMessageSentCompleted(
        seqNr: SeqNr,
        msg: A,
        entityId: ConfirmationQualifier): Behavior[InternalCommand] = {
      s.replyAfterStore.get(seqNr).foreach { replyTo =>
        context.log.info("Confirmation reply to [{}] after storage", seqNr)
        replyTo ! Done
      }
      val newReplyAfterStore = s.replyAfterStore - seqNr

      onMessage(entityId, msg, replyTo = None, seqNr, newReplyAfterStore)
    }

    def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
      if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) {
        val errorMessage =
          s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
        context.log.error(errorMessage)
        throw new TimeoutException(errorMessage)
      } else {
        context.log.info(s"StoreMessageSent seqNr [{}] failed, attempt [{}], retrying.", f.messageSent.seqNr, f.attempt)
        // retry
        storeMessageSent(f.messageSent, attempt = f.attempt + 1)
        Behaviors.same
      }
    }

    def receiveAck(ack: Ack): Behavior[InternalCommand] = {
      s.out.get(ack.outKey) match {
        case Some(outState) =>
          if (traceEnabled)
            context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr)
          val newUnconfirmed = onAck(outState, ack.confirmedSeqNr)
          val newUsedNanoTime =
            if (newUnconfirmed.size != outState.unconfirmed.size) System.nanoTime() else outState.usedNanoTime
          active(
            s.copy(out =
              s.out.updated(ack.outKey, outState.copy(unconfirmed = newUnconfirmed, usedNanoTime = newUsedNanoTime))))
        case None =>
          // obsolete Ack, ConsumerController already deregistered
          Behaviors.unhandled
      }
    }

    def receiveWrappedRequestNext(w: WrappedRequestNext[A]): Behavior[InternalCommand] = {
      val next = w.next
      val outKey = next.producerId
      s.out.get(outKey) match {
        case Some(out) =>
          if (out.nextTo.nonEmpty)
            throw new IllegalStateException(s"Received RequestNext but already has demand for [$outKey]")

          val confirmedSeqNr = w.next.confirmedSeqNr
          if (traceEnabled)
            context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr)
          val newUnconfirmed = onAck(out, confirmedSeqNr)

          if (out.buffered.nonEmpty) {
            val buf = out.buffered.head
            send(buf.msg, outKey, out.seqNr, next)
            val newUnconfirmed2 = newUnconfirmed :+ Unconfirmed(buf.totalSeqNr, out.seqNr, buf.replyTo)
            val newProducers = s.out.updated(
              outKey,
              out.copy(
                seqNr = out.seqNr + 1,
                nextTo = None,
                unconfirmed = newUnconfirmed2,
                buffered = out.buffered.tail,
                usedNanoTime = System.nanoTime()))
            active(s.copy(out = newProducers))
          } else {
            val newProducers =
              s.out.updated(
                outKey,
                out.copy(nextTo = Some(next), unconfirmed = newUnconfirmed, usedNanoTime = System.nanoTime()))
            val newState = s.copy(out = newProducers)
            // send an updated RequestNext
            s.producer ! createRequestNext(newState)
            active(newState)
          }

        case None =>
          // if ProducerController was stopped and there was a RequestNext in flight, but will not happen in practise
          context.log.warn("Received RequestNext for unknown [{}]", outKey)
          Behaviors.same
      }
    }

    def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
      ProducerControllerImpl.enforceLocalProducer(start.producer)
      context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
      start.producer ! createRequestNext(s)
      active(s.copy(producer = start.producer))
    }

    def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = {
      val now = System.nanoTime()
      s.out.foreach {
        case (outKey: OutKey, outState) =>
          val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
          if (outState.unconfirmed.nonEmpty && idleDurationMillis >= settings.resendFirstUnconfirmedIdleTimeout.toMillis) {
            context.log.debug(
              "Resend first unconfirmed for [{}], because it was idle for [{} ms]",
              outKey,
              idleDurationMillis)
            outState.producerController
              .unsafeUpcast[ProducerControllerImpl.InternalCommand] ! ProducerControllerImpl.ResendFirstUnconfirmed
          }
      }
      Behaviors.same
    }

    def receiveCleanupUnused(): Behavior[InternalCommand] = {
      val now = System.nanoTime()
      val removeOutKeys =
        s.out.flatMap {
          case (outKey: OutKey, outState) =>
            val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
            if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings
                .cleanupUnusedAfter.toMillis) {
              context.log.debug("Cleanup unused [{}], because it was idle for [{} ms]", outKey, idleDurationMillis)
              context.stop(outState.producerController)
              Some(outKey)
            } else
              None
        }
      if (removeOutKeys.isEmpty)
        Behaviors.same
      else
        active(s.copy(out = s.out -- removeOutKeys))
    }

    Behaviors.receiveMessage {

      case msg: Msg[A @unchecked] =>
        if (durableQueue.isEmpty) {
          // currentSeqNr is only updated when durableQueue is enabled
          onMessage(msg.envelope.entityId, msg.envelope.message, None, s.currentSeqNr, s.replyAfterStore)
        } else if (msg.isAlreadyStored) {
          // loaded from durable queue, currentSeqNr has already b
          onMessage(msg.envelope.entityId, msg.envelope.message, None, msg.alreadyStored, s.replyAfterStore)
        } else {
          storeMessageSent(
            MessageSent(s.currentSeqNr, msg.envelope.message, false, msg.envelope.entityId, System.currentTimeMillis()),
            attempt = 1)
          active(s.copy(currentSeqNr = s.currentSeqNr + 1))
        }

      case MessageWithConfirmation(entityId, message: A, replyTo) =>
        if (durableQueue.isEmpty) {
          onMessage(entityId, message, Some(replyTo), s.currentSeqNr, s.replyAfterStore)
        } else {
          storeMessageSent(
            MessageSent(s.currentSeqNr, message, ack = true, entityId, System.currentTimeMillis()),
            attempt = 1)
          val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo)
          active(s.copy(currentSeqNr = s.currentSeqNr + 1, replyAfterStore = newReplyAfterStore))
        }

      case StoreMessageSentCompleted(MessageSent(seqNr, msg: A, _, entityId, _)) =>
        receiveStoreMessageSentCompleted(seqNr, msg, entityId)

      case f: StoreMessageSentFailed[A @unchecked] =>
        receiveStoreMessageSentFailed(f)

      case ack: Ack =>
        receiveAck(ack)

      case w: WrappedRequestNext[A @unchecked] =>
        receiveWrappedRequestNext(w)

      case ResendFirstUnconfirmed =>
        receiveResendFirstUnconfirmed()

      case CleanupUnused =>
        receiveCleanupUnused()

      case start: Start[A @unchecked] =>
        receiveStart(start)

      case AskTimeout(outKey, outSeqNr) =>
        context.log.debug(
          "Message seqNr [{}] sent to entity [{}] timed out. It will be be redelivered.",
          outSeqNr,
          outKey)
        Behaviors.same

      case DurableQueueTerminated =>
        throw new IllegalStateException("DurableQueue was unexpectedly terminated.")

      case unexpected =>
        throw new RuntimeException(s"Unexpected message: $unexpected")
    }
  }