def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [241:311]


  def apply(publish: Publish,
      clientId: Option[String],
      packetId: PacketId,
      local: Promise[ForwardPublish.type],
      packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      settings: MqttSessionSettings): Behavior[Event] =
    prepareClientConsumption(Start(publish, clientId, packetId, local, packetRouter, settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val publish: Publish,
      val clientId: Option[String],
      val packetId: PacketId,
      val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      val settings: MqttSessionSettings)
  final case class Start(override val publish: Publish,
      override val clientId: Option[String],
      override val packetId: PacketId,
      local: Promise[ForwardPublish.type],
      override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(publish, clientId, packetId, packetRouter, settings)
  final case class ClientConsuming(override val publish: Publish,
      override val clientId: Option[String],
      override val packetId: PacketId,
      override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(publish, clientId, packetId, packetRouter, settings)

  sealed abstract class Event
  final case object RegisteredPacketId extends Event
  final case object UnobtainablePacketId extends Event
  final case class PubAckReceivedLocally(remote: Promise[ForwardPubAck.type]) extends Event
  final case class PubRecReceivedLocally(remote: Promise[ForwardPubRec.type]) extends Event
  case object ReceivePubAckRecTimeout extends Event
  final case class PubRelReceivedFromRemote(local: Promise[ForwardPubRel.type]) extends Event
  case object ReceivePubRelTimeout extends Event
  final case class PubCompReceivedLocally(remote: Promise[ForwardPubComp.type]) extends Event
  case object ReceivePubCompTimeout extends Event
  final case class DupPublishReceivedFromRemote(local: Promise[ForwardPublish.type]) extends Event

  sealed abstract class Command
  case object ForwardPublish extends Command
  case object ForwardPubAck extends Command
  case object ForwardPubRec extends Command
  case object ForwardPubRel extends Command
  case object ForwardPubComp extends Command

  // State event handling

  def prepareClientConsumption(data: Start): Behavior[Event] = Behaviors.setup { context =>
    val reply = Promise[RemotePacketRouter.Registered.type]()
    data.packetRouter ! RemotePacketRouter.Register(context.self.unsafeUpcast, data.clientId, data.packetId, reply)
    import context.executionContext
    reply.future.onComplete {
      case Success(RemotePacketRouter.Registered) => context.self ! RegisteredPacketId
      case Failure(_)                             => context.self ! UnobtainablePacketId
    }

    Behaviors.receiveMessagePartial[Event] {
      case RegisteredPacketId =>
        data.local.success(ForwardPublish)
        consumeUnacknowledged(
          ClientConsuming(data.publish, data.clientId, data.packetId, data.packetRouter, data.settings))
      case UnobtainablePacketId =>
        val ex = ConsumeFailed(data.publish)
        data.local.failure(ex)
        throw ex
    }

  }