in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [610:660]
def apply(subscribeData: SubscribeData,
remote: Promise[ForwardSubscribe],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
settings: MqttSessionSettings): Behavior[Event] =
prepareServerSubscribe(Start(subscribeData, remote, packetRouter, settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val settings: MqttSessionSettings)
final case class Start(subscribeData: SubscribeData,
remote: Promise[ForwardSubscribe],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(settings)
final case class ServerSubscribe(packetId: PacketId,
subscribeData: SubscribeData,
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(settings)
sealed abstract class Event
final case class AcquiredPacketId(packetId: PacketId) extends Event
final case object UnobtainablePacketId extends Event
final case class SubAckReceivedFromRemote(local: Promise[ForwardSubAck]) extends Event
case object ReceiveSubAckTimeout extends Event
sealed abstract class Command
final case class ForwardSubscribe(packetId: PacketId) extends Command
final case class ForwardSubAck(connectData: SubscribeData) extends Command
// State event handling
def prepareServerSubscribe(data: Start): Behavior[Event] = Behaviors.setup { context =>
val reply = Promise[LocalPacketRouter.Registered]()
data.packetRouter ! LocalPacketRouter.Register(context.self, reply)
import context.executionContext
reply.future.onComplete {
case Success(registered: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(registered.packetId)
case Failure(_) => context.self ! UnobtainablePacketId
}
Behaviors.receiveMessagePartial[Event] {
case AcquiredPacketId(packetId) =>
data.remote.success(ForwardSubscribe(packetId))
serverSubscribe(
ServerSubscribe(packetId, data.subscribeData, data.packetRouter, data.settings))
case UnobtainablePacketId =>
data.remote.failure(SubscribeFailed)
throw SubscribeFailed
}
}