def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [784:845]


  def apply(clientId: String,
      packetId: PacketId,
      local: Promise[ForwardSubscribe.type],
      subscribed: Promise[Done],
      packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      settings: MqttSessionSettings): Behavior[Event] =
    preparePublisher(Start(Some(clientId), packetId, local, subscribed, packetRouter, settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val clientId: Some[String],
      val packetId: PacketId,
      val subscribed: Promise[Done],
      val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      val settings: MqttSessionSettings)
  final case class Start(override val clientId: Some[String],
      override val packetId: PacketId,
      local: Promise[ForwardSubscribe.type],
      override val subscribed: Promise[Done],
      override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(clientId, packetId, subscribed, packetRouter, settings)
  final case class ServerSubscribe(override val clientId: Some[String],
      override val packetId: PacketId,
      override val subscribed: Promise[Done],
      override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(clientId, packetId, subscribed, packetRouter, settings)

  sealed abstract class Event
  final case object RegisteredPacketId extends Event
  final case object UnobtainablePacketId extends Event
  final case class SubAckReceivedLocally(remote: Promise[ForwardSubAck.type]) extends Event
  case object ReceiveSubAckTimeout extends Event

  sealed abstract class Command
  case object ForwardSubscribe extends Command
  case object ForwardSubAck extends Command

  // State event handling

  def preparePublisher(data: Start): Behavior[Event] = Behaviors.setup { context =>
    val reply = Promise[RemotePacketRouter.Registered.type]()
    data.packetRouter ! RemotePacketRouter.Register(context.self.unsafeUpcast, data.clientId, data.packetId, reply)
    import context.executionContext
    reply.future.onComplete {
      case Success(RemotePacketRouter.Registered) => context.self ! RegisteredPacketId
      case Failure(_)                             => context.self ! UnobtainablePacketId
    }

    Behaviors.receiveMessagePartial[Event] {
      case RegisteredPacketId =>
        data.local.success(ForwardSubscribe)
        serverSubscribe(
          ServerSubscribe(data.clientId, data.packetId, data.subscribed, data.packetRouter, data.settings))
      case UnobtainablePacketId =>
        data.local.failure(SubscribeFailed)
        data.subscribed.failure(SubscribeFailed)
        throw SubscribeFailed
    }

  }