def apply()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [62:150]


  def apply(publish: Publish,
      publishData: PublishData,
      remote: Promise[Source[ForwardPublishingCommand, NotUsed]],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
    preparePublish(Start(publish, publishData, remote, packetRouter, settings))

  // Our FSM data, FSM events and commands emitted by the FSM

  sealed abstract class Data(val publish: Publish, val publishData: PublishData, val settings: MqttSessionSettings)
  final case class Start(override val publish: Publish,
      override val publishData: PublishData,
      remote: Promise[Source[ForwardPublishingCommand, NotUsed]],
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(publish, publishData, settings)
  final case class Publishing(remote: SourceQueueWithComplete[ForwardPublishingCommand],
      packetId: PacketId,
      override val publish: Publish,
      override val publishData: PublishData,
      packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
      override val settings: MqttSessionSettings)
      extends Data(publish, publishData, settings)

  sealed abstract class Event
  final case class AcquiredPacketId(packetId: PacketId) extends Event
  final case object UnacquiredPacketId extends Event
  case object ReceivePubAckRecTimeout extends Event
  final case class PubAckReceivedFromRemote(local: Promise[ForwardPubAck]) extends Event
  final case class PubRecReceivedFromRemote(local: Promise[ForwardPubRec]) extends Event
  case object ReceivePubCompTimeout extends Event
  final case class PubCompReceivedFromRemote(local: Promise[ForwardPubComp]) extends Event
  case object ReceiveConnect extends Event

  final case class QueueOfferCompleted(result: Either[Throwable, QueueOfferResult])
      extends Event
      with QueueOfferState.QueueOfferCompleted

  sealed abstract class Command
  sealed abstract class ForwardPublishingCommand extends Command
  final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardPublishingCommand
  final case class ForwardPubAck(publishData: PublishData) extends Command
  final case class ForwardPubRec(publishData: PublishData) extends Command
  final case class ForwardPubRel(publish: Publish, packetId: PacketId) extends ForwardPublishingCommand
  final case class ForwardPubComp(publishData: PublishData) extends Command

  // State event handling

  def preparePublish(data: Start)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
    def requestPacketId(): Unit = {
      val reply = Promise[LocalPacketRouter.Registered]()
      data.packetRouter ! LocalPacketRouter.Register(context.self.unsafeUpcast, reply)
      import context.executionContext
      reply.future.onComplete {
        case Success(acquired: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(acquired.packetId)
        case Failure(_)                                      => context.self ! UnacquiredPacketId
      }
    }

    requestPacketId()

    val (queue, source) = Source
      .queue[ForwardPublishingCommand](data.settings.clientSendBufferSize, OverflowStrategy.backpressure)
      .toMat(BroadcastHub.sink)(Keep.both)
      .run()

    data.remote.success(source)

    Behaviors
      .receiveMessagePartial[Event] {
        case AcquiredPacketId(packetId) =>
          QueueOfferState.waitForQueueOfferCompleted(
            queue
              .offer(ForwardPublish(data.publish, Some(packetId))),
            result => QueueOfferCompleted(result.toEither),
            publishUnacknowledged(
              Publishing(queue, packetId, data.publish, data.publishData, data.packetRouter, data.settings)),
            stash = Vector.empty)

        case UnacquiredPacketId =>
          requestPacketId()
          Behaviors.same
      }
      .receiveSignal {
        case (_, PostStop) =>
          queue.complete()
          Behaviors.same
      }
  }