in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [693:743]
def apply(unsubscribeData: UnsubscribeData,
remote: Promise[ForwardUnsubscribe],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
settings: MqttSessionSettings): Behavior[Event] =
prepareServerUnsubscribe(Start(unsubscribeData, remote, packetRouter, settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val settings: MqttSessionSettings)
final case class Start(unsubscribeData: UnsubscribeData,
remote: Promise[ForwardUnsubscribe],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(settings)
final case class ServerUnsubscribe(packetId: PacketId,
unsubscribeData: UnsubscribeData,
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(settings)
sealed abstract class Event
final case class AcquiredPacketId(packetId: PacketId) extends Event
final case object UnobtainablePacketId extends Event
final case class UnsubAckReceivedFromRemote(local: Promise[ForwardUnsubAck]) extends Event
case object ReceiveUnsubAckTimeout extends Event
sealed abstract class Command
final case class ForwardUnsubscribe(packetId: PacketId) extends Command
final case class ForwardUnsubAck(connectData: UnsubscribeData) extends Command
// State event handling
def prepareServerUnsubscribe(data: Start): Behavior[Event] = Behaviors.setup { context =>
val reply = Promise[LocalPacketRouter.Registered]()
data.packetRouter ! LocalPacketRouter.Register(context.self, reply)
import context.executionContext
reply.future.onComplete {
case Success(registered: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(registered.packetId)
case Failure(_) => context.self ! UnobtainablePacketId
}
Behaviors.receiveMessagePartial[Event] {
case AcquiredPacketId(packetId) =>
data.remote.success(ForwardUnsubscribe(packetId))
serverUnsubscribe(
ServerUnsubscribe(packetId, data.unsubscribeData, data.packetRouter, data.settings))
case UnobtainablePacketId =>
data.remote.failure(UnsubscribeFailed)
throw UnsubscribeFailed
}
}