in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [57:127]
def apply(terminations: SourceQueueWithComplete[ClientSessionTerminated],
consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
listening(
Data(Map.empty,
terminations,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings))
// Our FSM data, FSM events and commands emitted by the FSM
final case class Data(clientConnections: Map[ByteString, (String, ActorRef[ClientConnection.Event])],
terminations: SourceQueueWithComplete[ClientSessionTerminated],
consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
settings: MqttSessionSettings)
sealed abstract class Event(val connectionId: ByteString)
final case class ConnectReceivedFromRemote(override val connectionId: ByteString,
connect: Connect,
local: Promise[ClientConnection.ForwardConnect.type])
extends Event(connectionId)
final case class ReceiveConnAckTimeout(override val connectionId: ByteString) extends Event(connectionId)
final case class ConnAckReceivedLocally(override val connectionId: ByteString,
connAck: ConnAck,
remote: Promise[Source[ClientConnection.ForwardConnAckCommand, NotUsed]])
extends Event(connectionId)
final case class SubscribeReceivedFromRemote(override val connectionId: ByteString,
subscribe: Subscribe,
local: Promise[Publisher.ForwardSubscribe.type])
extends Event(connectionId)
final case class PublishReceivedFromRemote(override val connectionId: ByteString,
publish: Publish,
local: Promise[Consumer.ForwardPublish.type])
extends Event(connectionId)
final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData)
extends Event(ByteString.empty)
final case class UnsubscribeReceivedFromRemote(override val connectionId: ByteString,
unsubscribe: Unsubscribe,
local: Promise[Unpublisher.ForwardUnsubscribe.type])
extends Event(connectionId)
final case class PingReqReceivedFromRemote(override val connectionId: ByteString,
local: Promise[ClientConnection.ForwardPingReq.type])
extends Event(connectionId)
final case class DisconnectReceivedFromRemote(override val connectionId: ByteString,
local: Promise[ClientConnection.ForwardDisconnect.type])
extends Event(connectionId)
final case class ConnectionLost(override val connectionId: ByteString) extends Event(connectionId)
final case class QueueOfferCompleted(override val connectionId: ByteString,
result: Either[Throwable, QueueOfferResult])
extends Event(connectionId)
with QueueOfferState.QueueOfferCompleted
// State event handling
private val ClientConnectionNamePrefix = "client-connection-"
private def forward(connectionId: ByteString,
clientConnections: Map[ByteString, (String, ActorRef[ClientConnection.Event])],
e: ClientConnection.Event): Behavior[Event] = {
clientConnections.get(connectionId).foreach { case (_, cc) => cc ! e }
Behaviors.same
}