def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [57:127]


  def apply(terminations: SourceQueueWithComplete[ClientSessionTerminated],
      consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
    listening(
      Data(Map.empty,
        terminations,
        consumerPacketRouter,
        producerPacketRouter,
        publisherPacketRouter,
        unpublisherPacketRouter,
        settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  final case class Data(clientConnections: Map[ByteString, (String, ActorRef[ClientConnection.Event])],
      terminations: SourceQueueWithComplete[ClientSessionTerminated],
      consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
      producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
      publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
      unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
      settings: MqttSessionSettings)

  sealed abstract class Event(val connectionId: ByteString)
  final case class ConnectReceivedFromRemote(override val connectionId: ByteString,
      connect: Connect,
      local: Promise[ClientConnection.ForwardConnect.type])
      extends Event(connectionId)
  final case class ReceiveConnAckTimeout(override val connectionId: ByteString) extends Event(connectionId)
  final case class ConnAckReceivedLocally(override val connectionId: ByteString,
      connAck: ConnAck,
      remote: Promise[Source[ClientConnection.ForwardConnAckCommand, NotUsed]])
      extends Event(connectionId)
  final case class SubscribeReceivedFromRemote(override val connectionId: ByteString,
      subscribe: Subscribe,
      local: Promise[Publisher.ForwardSubscribe.type])
      extends Event(connectionId)
  final case class PublishReceivedFromRemote(override val connectionId: ByteString,
      publish: Publish,
      local: Promise[Consumer.ForwardPublish.type])
      extends Event(connectionId)
  final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData)
      extends Event(ByteString.empty)
  final case class UnsubscribeReceivedFromRemote(override val connectionId: ByteString,
      unsubscribe: Unsubscribe,
      local: Promise[Unpublisher.ForwardUnsubscribe.type])
      extends Event(connectionId)
  final case class PingReqReceivedFromRemote(override val connectionId: ByteString,
      local: Promise[ClientConnection.ForwardPingReq.type])
      extends Event(connectionId)
  final case class DisconnectReceivedFromRemote(override val connectionId: ByteString,
      local: Promise[ClientConnection.ForwardDisconnect.type])
      extends Event(connectionId)
  final case class ConnectionLost(override val connectionId: ByteString) extends Event(connectionId)
  final case class QueueOfferCompleted(override val connectionId: ByteString,
      result: Either[Throwable, QueueOfferResult])
      extends Event(connectionId)
      with QueueOfferState.QueueOfferCompleted

  // State event handling

  private val ClientConnectionNamePrefix = "client-connection-"

  private def forward(connectionId: ByteString,
      clientConnections: Map[ByteString, (String, ActorRef[ClientConnection.Event])],
      e: ClientConnection.Event): Behavior[Event] = {
    clientConnections.get(connectionId).foreach { case (_, cc) => cc ! e }
    Behaviors.same
  }