def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [57:299]


  def apply(consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
      unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
      settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
    disconnected(
      Disconnected(
        Vector.empty,
        Map.empty,
        Map.empty,
        Vector.empty,
        Vector.empty,
        consumerPacketRouter,
        producerPacketRouter,
        subscriberPacketRouter,
        unsubscriberPacketRouter,
        settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val stash: Seq[Event],
      val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      val activeProducers: Map[String, ActorRef[Producer.Event]],
      val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
      val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
      val settings: MqttSessionSettings)
  final case class Disconnected(
      override val stash: Seq[Event],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
      override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        subscriberPacketRouter,
        unsubscriberPacketRouter,
        settings)
  final case class ConnectReceived(
      connectionId: ByteString,
      connect: Connect,
      connectData: ConnectData,
      remote: SourceQueueWithComplete[ForwardConnectCommand],
      override val stash: Seq[Event],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
      override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        subscriberPacketRouter,
        unsubscriberPacketRouter,
        settings)
  final case class ConnAckReceived(
      connectionId: ByteString,
      connectFlags: ConnectFlags,
      keepAlive: FiniteDuration,
      pendingPingResp: Boolean,
      remote: SourceQueueWithComplete[ForwardConnectCommand],
      override val stash: Seq[Event],
      override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
      override val activeProducers: Map[String, ActorRef[Producer.Event]],
      override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
      override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
      override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
      override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
      override val settings: MqttSessionSettings) extends Data(
        stash,
        activeConsumers,
        activeProducers,
        pendingLocalPublications,
        pendingRemotePublications,
        consumerPacketRouter,
        producerPacketRouter,
        subscriberPacketRouter,
        unsubscriberPacketRouter,
        settings)

  final case class WaitingForQueueOfferResult(nextBehavior: Behavior[Event], stash: Seq[Event])

  sealed abstract class Event(val connectionId: ByteString)

  final case class ConnectReceivedLocally(override val connectionId: ByteString,
      connect: Connect,
      connectData: ConnectData,
      remote: Promise[Source[ForwardConnectCommand, NotUsed]])
      extends Event(connectionId)
  final case class ConnAckReceivedFromRemote(override val connectionId: ByteString,
      connAck: ConnAck,
      local: Promise[ForwardConnAck])
      extends Event(connectionId)

  case class ReceiveConnAckTimeout(override val connectionId: ByteString) extends Event(connectionId)

  case class ConnectionLost(override val connectionId: ByteString) extends Event(connectionId)

  final case class DisconnectReceivedLocally(override val connectionId: ByteString,
      remote: Promise[ForwardDisconnect.type])
      extends Event(connectionId)

  final case class SubscribeReceivedLocally(override val connectionId: ByteString,
      subscribe: Subscribe,
      subscribeData: Subscriber.SubscribeData,
      remote: Promise[Subscriber.ForwardSubscribe])
      extends Event(connectionId)

  final case class PublishReceivedFromRemote(override val connectionId: ByteString,
      publish: Publish,
      local: Promise[Consumer.ForwardPublish.type])
      extends Event(connectionId)

  final case class ConsumerFree(topicName: String) extends Event(ByteString.empty)

  final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData)
      extends Event(ByteString.empty)

  final case class ProducerFree(topicName: String) extends Event(ByteString.empty)

  case class SendPingReqTimeout(override val connectionId: ByteString) extends Event(connectionId)

  final case class PingRespReceivedFromRemote(override val connectionId: ByteString,
      local: Promise[ForwardPingResp.type])
      extends Event(connectionId)

  final case class ReceivedProducerPublishingCommand(command: Producer.ForwardPublishingCommand)
      extends Event(ByteString.empty)

  final case class UnsubscribeReceivedLocally(override val connectionId: ByteString,
      unsubscribe: Unsubscribe,
      unsubscribeData: Unsubscriber.UnsubscribeData,
      remote: Promise[Unsubscriber.ForwardUnsubscribe])
      extends Event(connectionId)

  final case class QueueOfferCompleted(override val connectionId: ByteString,
      result: Either[Throwable, QueueOfferResult])
      extends Event(connectionId)
      with QueueOfferState.QueueOfferCompleted

  sealed abstract class Command
  sealed abstract class ForwardConnectCommand
  case object ForwardConnect extends ForwardConnectCommand
  case object ForwardPingReq extends ForwardConnectCommand
  final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardConnectCommand
  final case class ForwardPubRel(packetId: PacketId) extends ForwardConnectCommand
  final case class ForwardConnAck(connectData: ConnectData) extends Command
  case object ForwardDisconnect extends Command
  case object ForwardPingResp extends Command

  // State event handling

  private val ConsumerNamePrefix = "consumer-"
  private val ProducerNamePrefix = "producer-"

  def disconnected(data: Disconnected)(implicit mat: Materializer): Behavior[Event] =
    Behaviors
      .receivePartial[Event] {
        case (context, ConnectReceivedLocally(connectionId, connect, connectData, remote)) =>
          val (queue, source) = Source
            .queue[ForwardConnectCommand](data.settings.clientSendBufferSize, OverflowStrategy.backpressure)
            .toMat(BroadcastHub.sink)(Keep.both)
            .run()

          remote.success(source)

          val nextState =
            if (connect.connectFlags.contains(ConnectFlags.CleanSession)) {
              context.children.foreach(context.stop)

              serverConnect(
                ConnectReceived(
                  connectionId,
                  connect,
                  connectData,
                  queue,
                  Vector.empty,
                  Map.empty,
                  Map.empty,
                  Vector.empty,
                  Vector.empty,
                  data.consumerPacketRouter,
                  data.producerPacketRouter,
                  data.subscriberPacketRouter,
                  data.unsubscriberPacketRouter,
                  data.settings))
            } else {
              data.activeProducers.values.foreach { producer =>
                producer ! Producer.ReceiveConnect
              }

              serverConnect(
                ConnectReceived(
                  connectionId,
                  connect,
                  connectData,
                  queue,
                  Vector.empty,
                  data.activeConsumers,
                  data.activeProducers,
                  data.pendingLocalPublications,
                  data.pendingRemotePublications,
                  data.consumerPacketRouter,
                  data.producerPacketRouter,
                  data.subscriberPacketRouter,
                  data.unsubscriberPacketRouter,
                  data.settings))
            }

          QueueOfferState.waitForQueueOfferCompleted(
            queue.offer(ForwardConnect),
            result => QueueOfferCompleted(connectionId, result.toEither),
            nextState,
            data.stash)

        case (_, ConnectionLost(_)) =>
          Behaviors.same
        case (_, e) =>
          disconnected(data.copy(stash = data.stash :+ e))
      }