def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [610:660]


  def apply(subscribeData: SubscribeData,
      remote: Promise[ForwardSubscribe],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      settings: MqttSessionSettings): Behavior[Event] =
    prepareServerSubscribe(Start(subscribeData, remote, packetRouter, settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val settings: MqttSessionSettings)
  final case class Start(subscribeData: SubscribeData,
      remote: Promise[ForwardSubscribe],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(settings)
  final case class ServerSubscribe(packetId: PacketId,
      subscribeData: SubscribeData,
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(settings)

  sealed abstract class Event
  final case class AcquiredPacketId(packetId: PacketId) extends Event
  final case object UnobtainablePacketId extends Event
  final case class SubAckReceivedFromRemote(local: Promise[ForwardSubAck]) extends Event
  case object ReceiveSubAckTimeout extends Event

  sealed abstract class Command
  final case class ForwardSubscribe(packetId: PacketId) extends Command
  final case class ForwardSubAck(connectData: SubscribeData) extends Command

  // State event handling

  def prepareServerSubscribe(data: Start): Behavior[Event] = Behaviors.setup { context =>
    val reply = Promise[LocalPacketRouter.Registered]()
    data.packetRouter ! LocalPacketRouter.Register(context.self, reply)
    import context.executionContext
    reply.future.onComplete {
      case Success(registered: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(registered.packetId)
      case Failure(_)                                        => context.self ! UnobtainablePacketId
    }

    Behaviors.receiveMessagePartial[Event] {
      case AcquiredPacketId(packetId) =>
        data.remote.success(ForwardSubscribe(packetId))
        serverSubscribe(
          ServerSubscribe(packetId, data.subscribeData, data.packetRouter, data.settings))
      case UnobtainablePacketId =>
        data.remote.failure(SubscribeFailed)
        throw SubscribeFailed
    }
  }