private def active()

in actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala [300:676]


  private def active(s: State[A]): Behavior[InternalCommand] = {

    def onMessage(msg: A, wasStashed: Boolean, replyTo: Option[ActorRef[Done]], totalSeqNr: TotalSeqNr): State[A] = {
      val consumersWithDemand = s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector
      if (traceEnabled)
        context.log.traceN(
          "Received message seqNr [{}], wasStashed [{}], consumersWithDemand [{}], hasRequested [{}].",
          totalSeqNr,
          wasStashed,
          consumersWithDemand.map(_._1).mkString(", "),
          s.requested)
      if (!s.requested && !wasStashed && durableQueue.isEmpty)
        throw new IllegalStateException(s"Unexpected message [$msg], wasn't requested nor unstashed.")

      val selectedWorker =
        if (durableQueue.isDefined) {
          s.preselectedWorkers.get(totalSeqNr) match {
            case Some(PreselectedWorker(outKey, confirmationQualifier)) =>
              s.out.get(outKey) match {
                case Some(out) => Right(outKey -> out)
                case None      =>
                  // the preselected was deregistered in the meantime
                  context.self ! ResendDurableMsg(msg, confirmationQualifier, totalSeqNr)
                  Left(s)
              }
            case None =>
              throw new IllegalStateException(s"Expected preselected worker for seqNr [$totalSeqNr].")
          }
        } else {
          selectWorker() match {
            case Some(w) => Right(w)
            case None =>
              checkStashFull(stashBuffer)
              context.log.debug("Stashing message, seqNr [{}]", totalSeqNr)
              stashBuffer.stash(Msg(msg, wasStashed = true, replyTo))
              val newRequested = if (wasStashed) s.requested else false
              Left(s.copy(requested = newRequested))
          }
        }

      selectedWorker match {
        case Right((outKey, out)) =>
          val newUnconfirmed = out.unconfirmed :+ Unconfirmed(totalSeqNr, out.seqNr, msg, replyTo)
          val newOut = s.out.updated(outKey, out.copy(unconfirmed = newUnconfirmed, askNextTo = None))
          implicit val askTimeout: Timeout = workerAskTimeout
          context.ask[ProducerController.MessageWithConfirmation[A], OutSeqNr](
            out.askNextTo.get,
            ProducerController.MessageWithConfirmation(msg, _)) {
            case Success(seqNr) => Ack(outKey, seqNr)
            case Failure(_)     => AskTimeout(outKey, out.seqNr)
          }

          def tellRequestNext(): Unit = {
            if (traceEnabled)
              context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr)
            s.producer ! requestNext
          }

          val hasMoreDemand = consumersWithDemand.size >= 2
          // decision table based on s.hasRequested, wasStashed, hasMoreDemand, stashBuffer.isEmpty
          val newRequested =
            if (s.requested && !wasStashed && hasMoreDemand) {
              // request immediately since more demand
              tellRequestNext()
              true
            } else if (s.requested && !wasStashed && !hasMoreDemand) {
              // wait until more demand
              false
            } else if (!s.requested && wasStashed && hasMoreDemand && stashBuffer.isEmpty) {
              // msg was unstashed, the last from stash
              tellRequestNext()
              true
            } else if (!s.requested && wasStashed && hasMoreDemand && stashBuffer.nonEmpty) {
              // more in stash
              false
            } else if (!s.requested && wasStashed && !hasMoreDemand) {
              // wait until more demand
              false
            } else if (s.requested && wasStashed) {
              // msg was unstashed, but pending request already in progress
              true
            } else if (durableQueue.isDefined && !s.requested && !wasStashed) {
              // msg ResendDurableMsg, and stashed before storage
              false
            } else {
              throw new IllegalStateException(s"Invalid combination of hasRequested [${s.requested}], " +
                s"wasStashed [$wasStashed], hasMoreDemand [$hasMoreDemand], stashBuffer.isEmpty [${stashBuffer.isEmpty}]")
            }

          s.copy(out = newOut, requested = newRequested, preselectedWorkers = s.preselectedWorkers - totalSeqNr)

        case Left(newState) =>
          newState
      }
    }

    def workersWithDemand: Vector[(OutKey, OutState[A])] =
      s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector

    def selectWorker(): Option[(OutKey, OutState[A])] = {
      val preselected = s.preselectedWorkers.valuesIterator.map(_.outKey).toSet
      val workers = workersWithDemand.filterNot {
        case (outKey, _) => preselected(outKey)
      }
      if (workers.isEmpty) {
        None
      } else {
        val i = ThreadLocalRandom.current().nextInt(workers.size)
        Some(workers(i))
      }
    }

    def onMessageBeforeDurableQueue(msg: A, replyTo: Option[ActorRef[Done]]): Behavior[InternalCommand] = {
      selectWorker() match {
        case Some((outKey, out)) =>
          storeMessageSent(
            MessageSent(
              s.currentSeqNr,
              msg,
              ack = replyTo.isDefined,
              out.confirmationQualifier,
              System.currentTimeMillis()),
            attempt = 1)
          val newReplyAfterStore = replyTo match {
            case None    => s.replyAfterStore
            case Some(r) => s.replyAfterStore.updated(s.currentSeqNr, r)
          }
          active(
            s.copy(
              currentSeqNr = s.currentSeqNr + 1,
              preselectedWorkers =
                s.preselectedWorkers.updated(s.currentSeqNr, PreselectedWorker(outKey, out.confirmationQualifier)),
              replyAfterStore = newReplyAfterStore))
        case None =>
          checkStashFull(stashBuffer)
          // no demand from any workers, or all already preselected
          context.log.debug("Stash before storage, seqNr [{}]", s.currentSeqNr)
          // not stored yet, so don't treat it as stashed
          stashBuffer.stash(Msg(msg, wasStashed = false, replyTo))
          active(s)
      }
    }

    def onResendDurableMsg(resend: ResendDurableMsg[A]): Behavior[InternalCommand] = {
      require(durableQueue.isDefined, "Unexpected ResendDurableMsg when DurableQueue not defined.")
      selectWorker() match {
        case Some((outKey, out)) =>
          storeMessageSent(
            MessageSent(s.currentSeqNr, resend.msg, false, out.confirmationQualifier, System.currentTimeMillis()),
            attempt = 1)
          // When StoreMessageSentCompleted (oldConfirmationQualifier, oldSeqNr) confirmation will be stored
          active(
            s.copy(
              currentSeqNr = s.currentSeqNr + 1,
              preselectedWorkers =
                s.preselectedWorkers.updated(s.currentSeqNr, PreselectedWorker(outKey, out.confirmationQualifier)),
              handOver =
                s.handOver.updated(s.currentSeqNr, HandOver(resend.oldConfirmationQualifier, resend.oldSeqNr))))
        case None =>
          checkStashFull(stashBuffer)
          // no demand from any workers, or all already preselected
          context.log.debug("Stash before storage of resent durable message, seqNr [{}].", s.currentSeqNr)
          // not stored yet, so don't treat it as stashed
          stashBuffer.stash(resend)
          active(s)
      }
    }

    def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A) = {
      s.replyAfterStore.get(seqNr).foreach { replyTo =>
        if (traceEnabled)
          context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr)
        replyTo ! Done
      }

      val wasHandOver =
        s.handOver.get(seqNr) match {
          case Some(HandOver(oldConfirmationQualifier, oldSeqNr)) =>
            durableQueue.foreach { d =>
              d ! StoreMessageConfirmed(oldSeqNr, oldConfirmationQualifier, System.currentTimeMillis())
            }
            true
          case None =>
            false
        }

      val newState = onMessage(m, wasStashed = wasHandOver, replyTo = None, seqNr)
      active(newState.copy(replyAfterStore = newState.replyAfterStore - seqNr, handOver = newState.handOver - seqNr))
    }

    def receiveAck(ack: Ack): Behavior[InternalCommand] = {
      s.out.get(ack.outKey) match {
        case Some(outState) =>
          val newUnconfirmed = onAck(outState, ack.confirmedSeqNr)
          active(s.copy(out = s.out.updated(ack.outKey, outState.copy(unconfirmed = newUnconfirmed))))
        case None =>
          // obsolete Next, ConsumerController already deregistered
          Behaviors.unhandled
      }
    }

    def onAck(outState: OutState[A], confirmedSeqNr: OutSeqNr): Vector[Unconfirmed[A]] = {
      val (confirmed, newUnconfirmed) = outState.unconfirmed.partition {
        case Unconfirmed(_, seqNr, _, _) => seqNr <= confirmedSeqNr
      }

      if (confirmed.nonEmpty) {
        if (traceEnabled)
          context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier)
        confirmed.foreach {
          case Unconfirmed(_, _, _, None) => // no reply
          case Unconfirmed(_, _, _, Some(replyTo)) =>
            replyTo ! Done
        }

        durableQueue.foreach { d =>
          // Storing the confirmedSeqNr can be "write behind", at-least-once delivery
          d ! StoreMessageConfirmed(
            confirmed.last.totalSeqNr,
            outState.confirmationQualifier,
            System.currentTimeMillis())
        }
      }

      newUnconfirmed
    }

    def receiveWorkerRequestNext(w: WorkerRequestNext[A]): Behavior[InternalCommand] = {
      val next = w.next
      val outKey = next.producerId
      s.out.get(outKey) match {
        case Some(outState) =>
          val confirmedSeqNr = w.next.confirmedSeqNr
          if (traceEnabled)
            context.log.trace2(
              "Received RequestNext from worker [{}], confirmedSeqNr [{}].",
              w.next.producerId,
              confirmedSeqNr)

          val newUnconfirmed = onAck(outState, confirmedSeqNr)

          val newOut =
            s.out.updated(
              outKey,
              outState
                .copy(seqNr = w.next.currentSeqNr, unconfirmed = newUnconfirmed, askNextTo = Some(next.askNextTo)))

          if (stashBuffer.nonEmpty) {
            context.log.debug2("Unstash [{}] after RequestNext from worker [{}]", stashBuffer.size, w.next.producerId)
            stashBuffer.unstashAll(active(s.copy(out = newOut)))
          } else if (s.requested) {
            active(s.copy(out = newOut))
          } else {
            if (traceEnabled)
              context.log
                .trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId)
            s.producer ! requestNext
            active(s.copy(out = newOut, requested = true))
          }

        case None =>
          // obsolete Next, ConsumerController already deregistered
          Behaviors.unhandled
      }
    }

    def receiveCurrentWorkers(curr: CurrentWorkers[A]): Behavior[InternalCommand] = {
      // TODO #28722 we could also track unreachable workers and avoid them when selecting worker
      val addedWorkers = curr.workers.diff(s.workers)
      val removedWorkers = s.workers.diff(curr.workers)

      val newState = addedWorkers.foldLeft(s) { (acc, c) =>
        val uuid = UUID.randomUUID().toString
        val outKey = s"$producerId-$uuid"
        context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey)
        val p = context.spawn(
          ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings),
          uuid,
          DispatcherSelector.sameAsParent())
        p ! ProducerController.Start(workerRequestNextAdapter)
        p ! ProducerController.RegisterConsumer(c)
        acc.copy(out = acc.out.updated(outKey, OutState(p, c, 0L, Vector.empty, None)))
      }

      val newState2 = removedWorkers.foldLeft(newState) { (acc, c) =>
        acc.out.find { case (_, outState) => outState.consumerController == c } match {
          case Some((key, outState)) =>
            context.log.debug2("Deregistered worker [{}], with producerId [{}].", c, key)
            context.stop(outState.producerController)
            // resend the unconfirmed, sending to self since order of messages for WorkPulling doesn't matter anyway
            if (outState.unconfirmed.nonEmpty)
              context.log.debugN(
                "Resending unconfirmed from deregistered worker with producerId [{}], from seqNr [{}] to [{}].",
                key,
                outState.unconfirmed.head.outSeqNr,
                outState.unconfirmed.last.outSeqNr)
            outState.unconfirmed.foreach {
              case Unconfirmed(totalSeqNr, _, msg, replyTo) =>
                if (durableQueue.isEmpty)
                  context.self ! Msg(msg, wasStashed = true, replyTo)
                else
                  context.self ! ResendDurableMsg(msg, outState.confirmationQualifier, totalSeqNr)
            }
            acc.copy(out = acc.out - key)

          case None =>
            context.log.debug("Deregistered non-existing worker [{}]", c)
            acc
        }
      }

      active(newState2.copy(workers = curr.workers))
    }

    def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
      ProducerControllerImpl.enforceLocalProducer(start.producer)
      context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
      if (s.requested)
        start.producer ! requestNext
      active(s.copy(producer = start.producer))
    }

    Behaviors.receiveMessage {
      case Msg(msg: A, wasStashed, replyTo) =>
        if (durableQueue.isEmpty || wasStashed)
          active(onMessage(msg, wasStashed, replyTo, s.currentSeqNr))
        else
          onMessageBeforeDurableQueue(msg, replyTo)

      case MessageWithConfirmation(msg: A, replyTo) =>
        if (durableQueue.isEmpty)
          active(onMessage(msg, wasStashed = false, Some(replyTo), s.currentSeqNr))
        else
          onMessageBeforeDurableQueue(msg, Some(replyTo))

      case m: ResendDurableMsg[A @unchecked] =>
        onResendDurableMsg(m)

      case StoreMessageSentCompleted(MessageSent(seqNr, m: A, _, _, _)) =>
        receiveStoreMessageSentCompleted(seqNr, m)

      case f: StoreMessageSentFailed[A @unchecked] =>
        receiveStoreMessageSentFailed(f)

      case ack: Ack =>
        receiveAck(ack)

      case w: WorkerRequestNext[A @unchecked] =>
        receiveWorkerRequestNext(w)

      case curr: CurrentWorkers[A @unchecked] =>
        receiveCurrentWorkers(curr)

      case GetWorkerStats(replyTo) =>
        replyTo ! WorkerStats(s.workers.size)
        Behaviors.same

      case RegisterConsumerDone =>
        Behaviors.same

      case start: Start[A @unchecked] =>
        receiveStart(start)

      case AskTimeout(outKey, outSeqNr) =>
        context.log.debug(
          "Message seqNr [{}] sent to worker [{}] timed out. It will be be redelivered.",
          outSeqNr,
          outKey)
        Behaviors.same

      case DurableQueueTerminated =>
        throw new IllegalStateException("DurableQueue was unexpectedly terminated.")

      case unexpected =>
        throw new RuntimeException(s"Unexpected message: $unexpected")
    }
  }