in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [241:311]
def apply(publish: Publish,
clientId: Option[String],
packetId: PacketId,
local: Promise[ForwardPublish.type],
packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
settings: MqttSessionSettings): Behavior[Event] =
prepareClientConsumption(Start(publish, clientId, packetId, local, packetRouter, settings))
// Our FSM data, FSM events and commands emitted by the FSM
sealed abstract class Data(val publish: Publish,
val clientId: Option[String],
val packetId: PacketId,
val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
val settings: MqttSessionSettings)
final case class Start(override val publish: Publish,
override val clientId: Option[String],
override val packetId: PacketId,
local: Promise[ForwardPublish.type],
override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(publish, clientId, packetId, packetRouter, settings)
final case class ClientConsuming(override val publish: Publish,
override val clientId: Option[String],
override val packetId: PacketId,
override val packetRouter: ActorRef[RemotePacketRouter.Request[Event]],
override val settings: MqttSessionSettings)
extends Data(publish, clientId, packetId, packetRouter, settings)
sealed abstract class Event
final case object RegisteredPacketId extends Event
final case object UnobtainablePacketId extends Event
final case class PubAckReceivedLocally(remote: Promise[ForwardPubAck.type]) extends Event
final case class PubRecReceivedLocally(remote: Promise[ForwardPubRec.type]) extends Event
case object ReceivePubAckRecTimeout extends Event
final case class PubRelReceivedFromRemote(local: Promise[ForwardPubRel.type]) extends Event
case object ReceivePubRelTimeout extends Event
final case class PubCompReceivedLocally(remote: Promise[ForwardPubComp.type]) extends Event
case object ReceivePubCompTimeout extends Event
final case class DupPublishReceivedFromRemote(local: Promise[ForwardPublish.type]) extends Event
sealed abstract class Command
case object ForwardPublish extends Command
case object ForwardPubAck extends Command
case object ForwardPubRec extends Command
case object ForwardPubRel extends Command
case object ForwardPubComp extends Command
// State event handling
def prepareClientConsumption(data: Start): Behavior[Event] = Behaviors.setup { context =>
val reply = Promise[RemotePacketRouter.Registered.type]()
data.packetRouter ! RemotePacketRouter.Register(context.self.unsafeUpcast, data.clientId, data.packetId, reply)
import context.executionContext
reply.future.onComplete {
case Success(RemotePacketRouter.Registered) => context.self ! RegisteredPacketId
case Failure(_) => context.self ! UnobtainablePacketId
}
Behaviors.receiveMessagePartial[Event] {
case RegisteredPacketId =>
data.local.success(ForwardPublish)
consumeUnacknowledged(
ClientConsuming(data.publish, data.clientId, data.packetId, data.packetRouter, data.settings))
case UnobtainablePacketId =>
val ex = ConsumeFailed(data.publish)
data.local.failure(ex)
throw ex
}
}