in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [62:150]
def apply(publish: Publish,
publishData: PublishData,
remote: Promise[Source[ForwardPublishingCommand, NotUsed]],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
settings: MqttSessionSettings)(implicit mat: Materializer): Behavior[Event] =
preparePublish(Start(publish, publishData, remote, packetRouter, settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val publish: Publish, val publishData: PublishData, val settings: MqttSessionSettings)
final case class Start(override val publish: Publish,
override val publishData: PublishData,
remote: Promise[Source[ForwardPublishingCommand, NotUsed]],
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(publish, publishData, settings)
final case class Publishing(remote: SourceQueueWithComplete[ForwardPublishingCommand],
packetId: PacketId,
override val publish: Publish,
override val publishData: PublishData,
packetRouter: ActorRef[LocalPacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(publish, publishData, settings)
sealed abstract class Event
final case class AcquiredPacketId(packetId: PacketId) extends Event
final case object UnacquiredPacketId extends Event
case object ReceivePubAckRecTimeout extends Event
final case class PubAckReceivedFromRemote(local: Promise[ForwardPubAck]) extends Event
final case class PubRecReceivedFromRemote(local: Promise[ForwardPubRec]) extends Event
case object ReceivePubCompTimeout extends Event
final case class PubCompReceivedFromRemote(local: Promise[ForwardPubComp]) extends Event
case object ReceiveConnect extends Event
final case class QueueOfferCompleted(result: Either[Throwable, QueueOfferResult])
extends Event
with QueueOfferState.QueueOfferCompleted
sealed abstract class Command
sealed abstract class ForwardPublishingCommand extends Command
final case class ForwardPublish(publish: Publish, packetId: Option[PacketId]) extends ForwardPublishingCommand
final case class ForwardPubAck(publishData: PublishData) extends Command
final case class ForwardPubRec(publishData: PublishData) extends Command
final case class ForwardPubRel(publish: Publish, packetId: PacketId) extends ForwardPublishingCommand
final case class ForwardPubComp(publishData: PublishData) extends Command
// State event handling
def preparePublish(data: Start)(implicit mat: Materializer): Behavior[Event] = Behaviors.setup { context =>
def requestPacketId(): Unit = {
val reply = Promise[LocalPacketRouter.Registered]()
data.packetRouter ! LocalPacketRouter.Register(context.self.unsafeUpcast, reply)
import context.executionContext
reply.future.onComplete {
case Success(acquired: LocalPacketRouter.Registered) => context.self ! AcquiredPacketId(acquired.packetId)
case Failure(_) => context.self ! UnacquiredPacketId
}
}
requestPacketId()
val (queue, source) = Source
.queue[ForwardPublishingCommand](data.settings.clientSendBufferSize, OverflowStrategy.backpressure)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
data.remote.success(source)
Behaviors
.receiveMessagePartial[Event] {
case AcquiredPacketId(packetId) =>
QueueOfferState.waitForQueueOfferCompleted(
queue
.offer(ForwardPublish(data.publish, Some(packetId))),
result => QueueOfferCompleted(result.toEither),
publishUnacknowledged(
Publishing(queue, packetId, data.publish, data.publishData, data.packetRouter, data.settings)),
stash = Vector.empty)
case UnacquiredPacketId =>
requestPacketId()
Behaviors.same
}
.receiveSignal {
case (_, PostStop) =>
queue.complete()
Behaviors.same
}
}