in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [230:443]
def apply(connect: Connect,
local: Promise[ForwardConnect.type],
consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
clientConnect(
ConnectReceived(
connect,
local,
Vector.empty,
Set.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val stash: Seq[Event],
val publishers: Set[String],
val activeConsumers: Map[String, ActorRef[Consumer.Event]],
val activeProducers: Map[String, ActorRef[Producer.Event]],
val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
val settings: MqttSessionSettings)
final case class ConnectReceived(
connect: Connect,
local: Promise[ForwardConnect.type],
override val stash: Seq[Event],
override val publishers: Set[String],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
publishers,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings)
final case class ConnAckReplied(
connect: Connect,
remote: SourceQueueWithComplete[ForwardConnAckCommand],
override val stash: Seq[Event],
override val publishers: Set[String],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
publishers,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings)
final case class Disconnected(
override val stash: Seq[Event],
override val publishers: Set[String],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val publisherPacketRouter: ActorRef[RemotePacketRouter.Request[Publisher.Event]],
override val unpublisherPacketRouter: ActorRef[RemotePacketRouter.Request[Unpublisher.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
publishers,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
publisherPacketRouter,
unpublisherPacketRouter,
settings)
sealed abstract class Event
case object ReceiveConnAckTimeout extends Event
final case class ConnAckReceivedLocally(connAck: ConnAck, remote: Promise[Source[ForwardConnAckCommand, NotUsed]])
extends Event
final case class SubscribeReceivedFromRemote(subscribe: Subscribe, local: Promise[Publisher.ForwardSubscribe.type])
extends Event
final case class Subscribed(subscribe: Subscribe) extends Event
final case class PublishReceivedFromRemote(publish: Publish, local: Promise[Consumer.ForwardPublish.type])
extends Event
final case class ConsumerFree(topicName: String) extends Event
final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData) extends Event
final case class ProducerFree(topicName: String) extends Event
final case class UnsubscribeReceivedFromRemote(unsubscribe: Unsubscribe,
local: Promise[Unpublisher.ForwardUnsubscribe.type])
extends Event
final case class Unsubscribed(unsubscribe: Unsubscribe) extends Event
final case class PingReqReceivedFromRemote(local: Promise[ForwardPingReq.type]) extends Event
final case class DisconnectReceivedFromRemote(local: Promise[ForwardDisconnect.type]) extends Event
case object ConnectionLost extends Event
case object ReceivePingReqTimeout extends Event
final case class ReceivedProducerPublishingCommand(command: Producer.ForwardPublishingCommand) extends Event
final case class ConnectReceivedFromRemote(connect: Connect, local: Promise[ClientConnection.ForwardConnect.type])
extends Event
case object ReceiveConnectTimeout extends Event
final case class QueueOfferCompleted(result: Either[Throwable, QueueOfferResult])
extends Event
with QueueOfferState.QueueOfferCompleted
sealed abstract class Command
case object ForwardConnect extends Command
sealed abstract class ForwardConnAckCommand
case object ForwardConnAck extends ForwardConnAckCommand
case object ForwardPingReq extends Command
case object ForwardPingResp extends ForwardConnAckCommand
case object ForwardDisconnect extends Command
final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardConnAckCommand
final case class ForwardPubRel(packetId: PacketId) extends ForwardConnAckCommand
// State event handling
private val ConsumerNamePrefix = "consumer-"
private val ProducerNamePrefix = "producer-"
def clientConnect(data: ConnectReceived)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
context.log.debug("clientConnect stash={}", data.stash)
data.local.trySuccess(ForwardConnect)
Behaviors.withTimers { timer =>
val ReceiveConnAck = "receive-connack"
if (!timer.isTimerActive(ReceiveConnAck))
timer.startSingleTimer(ReceiveConnAck, ReceiveConnAckTimeout, data.settings.receiveConnAckTimeout)
Behaviors
.receivePartial[Event] {
case (_, ConnAckReceivedLocally(_, remote)) =>
val (queue, source) = Source
.queue[ForwardConnAckCommand](data.settings.serverSendBufferSize, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
remote.success(source)
timer.cancel(ReceiveConnAck)
data.activeProducers.values
.foreach(_ ! Producer.ReceiveConnect)
QueueOfferState.waitForQueueOfferCompleted(
queue
.offer(ForwardConnAck),
result => QueueOfferCompleted(result.toEither),
clientConnected(
ConnAckReplied(
data.connect,
queue,
Vector.empty,
data.publishers,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.publisherPacketRouter,
data.unpublisherPacketRouter,
data.settings)),
stash = data.stash)
case (_, ReceiveConnAckTimeout) =>
throw ClientConnectionFailed
case (_, ClientConnection.ConnectionLost) =>
throw ClientConnectionFailed
case (_, PublishReceivedLocally(publish, _))
if !data.publishers.exists(Topics.filter(_, publish.topicName)) =>
Behaviors.same
case (_, e) =>
clientConnect(data.copy(stash = data.stash :+ e))
}
.receiveSignal {
case (_, _: Terminated) =>
Behaviors.same
}
}
}