def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [230:443]


  def apply(connect: Connect,
      local: Promise[ForwardConnect.type],
      consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
    clientConnect(
      ConnectReceived(
        connect,
        local,
        Vector.empty,
        Set.empty,
        Map.empty,
        Map.empty,
        Vector.empty,
        Vector.empty,
        consumerPacketRouter,
        producerPacketRouter,
        publisherPacketRouter,
        unpublisherPacketRouter,
        settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val stash: Seq[Event],
      val publishers: Set[String],
      val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      val activeProducers: Map[String, ActorRef[Producer.Event]],
      val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      val settings: MqttSessionSettings)
  final case class ConnectReceived(
      connect: Connect,
      local: Promise[ForwardConnect.type],
      override val stash: Seq[Event],
      override val publishers: Set[String],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        publishers,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        publisherPacketRouter,
        unpublisherPacketRouter,
        settings)
  final case class ConnAckReplied(
      connect: Connect,
      remote: SourceQueueWithComplete[ForwardConnAckCommand],
      override val stash: Seq[Event],
      override val publishers: Set[String],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        publishers,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        publisherPacketRouter,
        unpublisherPacketRouter,
        settings)
  final case class Disconnected(
      override val stash: Seq[Event],
      override val publishers: Set[String],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        publishers,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        publisherPacketRouter,
        unpublisherPacketRouter,
        settings)

  sealed abstract class Event
  case object ReceiveConnAckTimeout extends Event
  final case class ConnAckReceivedLocally(connAck: ConnAck, remote: Promise[Source[ForwardConnAckCommand, NotUsed]])
      extends Event
  final case class SubscribeReceivedFromRemote(subscribe: Subscribe, local: Promise[Publisher.ForwardSubscribe.type])
      extends Event
  final case class Subscribed(subscribe: Subscribe) extends Event
  final case class PublishReceivedFromRemote(publish: Publish, local: Promise[Consumer.ForwardPublish.type])
      extends Event
  final case class ConsumerFree(topicName: String) extends Event
  final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData) extends Event
  final case class ProducerFree(topicName: String) extends Event
  final case class UnsubscribeReceivedFromRemote(unsubscribe: Unsubscribe,
      local: Promise[Unpublisher.ForwardUnsubscribe.type])
      extends Event
  final case class Unsubscribed(unsubscribe: Unsubscribe) extends Event
  final case class PingReqReceivedFromRemote(local: Promise[ForwardPingReq.type]) extends Event
  final case class DisconnectReceivedFromRemote(local: Promise[ForwardDisconnect.type]) extends Event
  case object ConnectionLost extends Event
  case object ReceivePingReqTimeout extends Event
  final case class ReceivedProducerPublishingCommand(command: Producer.ForwardPublishingCommand) extends Event
  final case class ConnectReceivedFromRemote(connect: Connect, local: Promise[ClientConnection.ForwardConnect.type])
      extends Event
  case object ReceiveConnectTimeout extends Event
  final case class QueueOfferCompleted(result: Either[Throwable, QueueOfferResult])
      extends Event
      with QueueOfferState.QueueOfferCompleted

  sealed abstract class Command
  case object ForwardConnect extends Command
  sealed abstract class ForwardConnAckCommand
  case object ForwardConnAck extends ForwardConnAckCommand
  case object ForwardPingReq extends Command
  case object ForwardPingResp extends ForwardConnAckCommand
  case object ForwardDisconnect extends Command
  final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardConnAckCommand
  final case class ForwardPubRel(packetId: PacketId) extends ForwardConnAckCommand

  // State event handling

  private val ConsumerNamePrefix = "consumer-"
  private val ProducerNamePrefix = "producer-"

  def clientConnect(data: ConnectReceived)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
    context.log.debug("clientConnect stash={}", data.stash)
    data.local.trySuccess(ForwardConnect)

    Behaviors.withTimers { timer =>
      val ReceiveConnAck = "receive-connack"
      if (!timer.isTimerActive(ReceiveConnAck))
        timer.startSingleTimer(ReceiveConnAck, ReceiveConnAckTimeout, data.settings.receiveConnAckTimeout)

      Behaviors
        .receivePartial[Event] {
          case (_, ConnAckReceivedLocally(_, remote)) =>
            val (queue, source) = Source
              .queue[ForwardConnAckCommand](data.settings.serverSendBufferSize, OverflowStrategy.backpressure)
              .toMat(BroadcastHub.sink)(Keep.both)
              .run()

            remote.success(source)

            timer.cancel(ReceiveConnAck)

            data.activeProducers.values
              .foreach(_ ! Producer.ReceiveConnect)

            QueueOfferState.waitForQueueOfferCompleted(
              queue
                .offer(ForwardConnAck),
              result => QueueOfferCompleted(result.toEither),
              clientConnected(
                ConnAckReplied(
                  data.connect,
                  queue,
                  Vector.empty,
                  data.publishers,
                  data.activeConsumers,
                  data.activeProducers,
                  data.pendingLocalPublications,
                  data.pendingRemotePublications,
                  data.consumerPacketRouter,
                  data.producerPacketRouter,
                  data.publisherPacketRouter,
                  data.unpublisherPacketRouter,
                  data.settings)),
              stash = data.stash)

          case (_, ReceiveConnAckTimeout) =>
            throw ClientConnectionFailed
          case (_, ClientConnection.ConnectionLost) =>
            throw ClientConnectionFailed
          case (_, PublishReceivedLocally(publish, _))
              if !data.publishers.exists(Topics.filter(_, publish.topicName)) =>
            Behaviors.same
          case (_, e) =>
            clientConnect(data.copy(stash = data.stash :+ e))
        }
        .receiveSignal {
          case (_, _: Terminated) =>
            Behaviors.same
        }
    }
  }