in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [57:299]
def apply(consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
disconnected(
Disconnected(
Vector.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val stash: Seq[Event],
val activeConsumers: Map[String, ActorRef[Consumer.Event]],
val activeProducers: Map[String, ActorRef[Producer.Event]],
val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
val settings: MqttSessionSettings)
final case class Disconnected(
override val stash: Seq[Event],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings)
final case class ConnectReceived(
connectionId: ByteString,
connect: Connect,
connectData: ConnectData,
remote: SourceQueueWithComplete[ForwardConnectCommand],
override val stash: Seq[Event],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings)
final case class ConnAckReceived(
connectionId: ByteString,
connectFlags: ConnectFlags,
keepAlive: FiniteDuration,
pendingPingResp: Boolean,
remote: SourceQueueWithComplete[ForwardConnectCommand],
override val stash: Seq[Event],
override val activeConsumers: Map[String, ActorRef[Consumer.Event]],
override val activeProducers: Map[String, ActorRef[Producer.Event]],
override val pendingLocalPublications: Seq[(String, PublishReceivedLocally)],
override val pendingRemotePublications: Seq[(String, PublishReceivedFromRemote)],
override val consumerPacketRouter: ActorRef[RemotePacketRouter.Request[Consumer.Event]],
override val producerPacketRouter: ActorRef[LocalPacketRouter.Request[Producer.Event]],
override val subscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Subscriber.Event]],
override val unsubscriberPacketRouter: ActorRef[LocalPacketRouter.Request[Unsubscriber.Event]],
override val settings: MqttSessionSettings) extends Data(
stash,
activeConsumers,
activeProducers,
pendingLocalPublications,
pendingRemotePublications,
consumerPacketRouter,
producerPacketRouter,
subscriberPacketRouter,
unsubscriberPacketRouter,
settings)
final case class WaitingForQueueOfferResult(nextBehavior: Behavior[Event], stash: Seq[Event])
sealed abstract class Event(val connectionId: ByteString)
final case class ConnectReceivedLocally(override val connectionId: ByteString,
connect: Connect,
connectData: ConnectData,
remote: Promise[Source[ForwardConnectCommand, NotUsed]])
extends Event(connectionId)
final case class ConnAckReceivedFromRemote(override val connectionId: ByteString,
connAck: ConnAck,
local: Promise[ForwardConnAck])
extends Event(connectionId)
case class ReceiveConnAckTimeout(override val connectionId: ByteString) extends Event(connectionId)
case class ConnectionLost(override val connectionId: ByteString) extends Event(connectionId)
final case class DisconnectReceivedLocally(override val connectionId: ByteString,
remote: Promise[ForwardDisconnect.type])
extends Event(connectionId)
final case class SubscribeReceivedLocally(override val connectionId: ByteString,
subscribe: Subscribe,
subscribeData: Subscriber.SubscribeData,
remote: Promise[Subscriber.ForwardSubscribe])
extends Event(connectionId)
final case class PublishReceivedFromRemote(override val connectionId: ByteString,
publish: Publish,
local: Promise[Consumer.ForwardPublish.type])
extends Event(connectionId)
final case class ConsumerFree(topicName: String) extends Event(ByteString.empty)
final case class PublishReceivedLocally(publish: Publish, publishData: Producer.PublishData)
extends Event(ByteString.empty)
final case class ProducerFree(topicName: String) extends Event(ByteString.empty)
case class SendPingReqTimeout(override val connectionId: ByteString) extends Event(connectionId)
final case class PingRespReceivedFromRemote(override val connectionId: ByteString,
local: Promise[ForwardPingResp.type])
extends Event(connectionId)
final case class ReceivedProducerPublishingCommand(command: Producer.ForwardPublishingCommand)
extends Event(ByteString.empty)
final case class UnsubscribeReceivedLocally(override val connectionId: ByteString,
unsubscribe: Unsubscribe,
unsubscribeData: Unsubscriber.UnsubscribeData,
remote: Promise[Unsubscriber.ForwardUnsubscribe])
extends Event(connectionId)
final case class QueueOfferCompleted(override val connectionId: ByteString,
result: Either[Throwable, QueueOfferResult])
extends Event(connectionId)
with QueueOfferState.QueueOfferCompleted
sealed abstract class Command
sealed abstract class ForwardConnectCommand
case object ForwardConnect extends ForwardConnectCommand
case object ForwardPingReq extends ForwardConnectCommand
final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardConnectCommand
final case class ForwardPubRel(packetId: PacketId) extends ForwardConnectCommand
final case class ForwardConnAck(connectData: ConnectData) extends Command
case object ForwardDisconnect extends Command
case object ForwardPingResp extends Command
// State event handling
private val ConsumerNamePrefix = "consumer-"
private val ProducerNamePrefix = "producer-"
def disconnected(data: Disconnected)(implicit mat: Materializer): Behavior[Event] =
Behaviors
.receivePartial[Event] {
case (context, ConnectReceivedLocally(connectionId, connect, connectData, remote)) =>
val (queue, source) = Source
.queue[ForwardConnectCommand](data.settings.clientSendBufferSize, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
remote.success(source)
val nextState =
if (connect.connectFlags.contains(ConnectFlags.CleanSession)) {
context.children.foreach(context.stop)
serverConnect(
ConnectReceived(
connectionId,
connect,
connectData,
queue,
Vector.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings))
} else {
data.activeProducers.values.foreach { producer =>
producer ! Producer.ReceiveConnect
}
serverConnect(
ConnectReceived(
connectionId,
connect,
connectData,
queue,
Vector.empty,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings))
}
QueueOfferState.waitForQueueOfferCompleted(
queue.offer(ForwardConnect),
result => QueueOfferCompleted(connectionId, result.toEither),
nextState,
data.stash)
case (_, ConnectionLost(_)) =>
Behaviors.same
case (_, e) =>
disconnected(data.copy(stash = data.stash :+ e))
}