in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [784:845]
def apply(clientId: String,
packetId: PacketId,
local: Promise[ForwardSubscribe.type],
subscribed: Promise[Done],
packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
settings: MqttSessionSettings): Behavior[Event] =
preparePublisher(Start(Some(clientId), packetId, local, subscribed, packetRouter, settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val clientId: Some[String],
val packetId: PacketId,
val subscribed: Promise[Done],
val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
val settings: MqttSessionSettings)
final case class Start(override val clientId: Some[String],
override val packetId: PacketId,
local: Promise[ForwardSubscribe.type],
override val subscribed: Promise[Done],
override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(clientId, packetId, subscribed, packetRouter, settings)
final case class ServerSubscribe(override val clientId: Some[String],
override val packetId: PacketId,
override val subscribed: Promise[Done],
override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(clientId, packetId, subscribed, packetRouter, settings)
sealed abstract class Event
final case object RegisteredPacketId extends Event
final case object UnobtainablePacketId extends Event
final case class SubAckReceivedLocally(remote: Promise[ForwardSubAck.type]) extends Event
case object ReceiveSubAckTimeout extends Event
sealed abstract class Command
case object ForwardSubscribe extends Command
case object ForwardSubAck extends Command
// State event handling
def preparePublisher(data: Start): Behavior[Event] = Behaviors.setup { context =>
val reply = Promise[RemotePacketRouter.Registered.type]()
data.packetRouter ! RemotePacketRouter.Register(context.self.unsafeUpcast, data.clientId, data.packetId, reply)
import context.executionContext
reply.future.onComplete {
case Success(RemotePacketRouter.Registered) => context.self ! RegisteredPacketId
case Failure(_) => context.self ! UnobtainablePacketId
}
Behaviors.receiveMessagePartial[Event] {
case RegisteredPacketId =>
data.local.success(ForwardSubscribe)
serverSubscribe(
ServerSubscribe(data.clientId, data.packetId, data.subscribed, data.packetRouter, data.settings))
case UnobtainablePacketId =>
data.local.failure(SubscribeFailed)
data.subscribed.failure(SubscribeFailed)
throw SubscribeFailed
}
}