def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [693:743]


  def apply(unsubscribeData: UnsubscribeData,
      remote: Promise[ForwardUnsubscribe],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      settings: MqttSessionSettings): Behavior[Event] =
    prepareServerUnsubscribe(Start(unsubscribeData, remote, packetRouter, settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val settings: MqttSessionSettings)
  final case class Start(unsubscribeData: UnsubscribeData,
      remote: Promise[ForwardUnsubscribe],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(settings)
  final case class ServerUnsubscribe(packetId: PacketId,
      unsubscribeData: UnsubscribeData,
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(settings)

  sealed abstract class Event
  final case class AcquiredPacketId(packetId: PacketId) extends Event
  final case object UnobtainablePacketId extends Event
  final case class UnsubAckReceivedFromRemote(local: Promise[ForwardUnsubAck]) extends Event
  case object ReceiveUnsubAckTimeout extends Event

  sealed abstract class Command
  final case class ForwardUnsubscribe(packetId: PacketId) extends Command
  final case class ForwardUnsubAck(connectData: UnsubscribeData) extends Command

  // State event handling

  def prepareServerUnsubscribe(data: Start): Behavior[Event] = Behaviors.setup { context =>
    val reply = Promise[LocalPacketRouter.Registered]()
    data.packetRouter ! LocalPacketRouter.Register(context.self, reply)
    import context.executionContext
    reply.future.onComplete {
      case Success(registered: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(registered.packetId)
      case Failure(_)                                        => context.self ! UnobtainablePacketId
    }

    Behaviors.receiveMessagePartial[Event] {
      case AcquiredPacketId(packetId) =>
        data.remote.success(ForwardUnsubscribe(packetId))
        serverUnsubscribe(
          ServerUnsubscribe(packetId, data.unsubscribeData, data.packetRouter, data.settings))
      case UnobtainablePacketId =>
        data.remote.failure(UnsubscribeFailed)
        throw UnsubscribeFailed
    }
  }