in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [467:701]
def clientConnected(data: ConnAckReplied)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
timer =>
val ReceivePingreq = "receive-pingreq"
if (data.connect.keepAlive.toMillis > 0)
timer.startSingleTimer(ReceivePingreq,
ReceivePingReqTimeout,
FiniteDuration((data.connect.keepAlive.toMillis * 1.5).toLong, TimeUnit.MILLISECONDS))
Behaviors
.receivePartial[Event] {
case (context, SubscribeReceivedFromRemote(subscribe, local)) =>
val subscribed = Promise[Done]()
context.watch(
context.spawnAnonymous(
Publisher(data.connect.clientId,
subscribe.packetId,
local,
subscribed,
data.publisherPacketRouter,
data.settings)))
subscribed.future.foreach(_ => context.self ! Subscribed(subscribe))(context.executionContext)
clientConnected(data)
case (_, Subscribed(subscribe)) =>
clientConnected(
data.copy(
publishers = data.publishers ++ subscribe.topicFilters.map(_._1)))
case (context, UnsubscribeReceivedFromRemote(unsubscribe, local)) =>
val unsubscribed = Promise[Done]()
context.watch(
context.spawnAnonymous(
Unpublisher(data.connect.clientId,
unsubscribe.packetId,
local,
unsubscribed,
data.unpublisherPacketRouter,
data.settings)))
unsubscribed.future.foreach(_ => context.self ! Unsubscribed(unsubscribe))(context.executionContext)
clientConnected(data)
case (_, Unsubscribed(unsubscribe)) =>
clientConnected(data.copy(publishers = data.publishers -- unsubscribe.topicFilters))
case (_, PublishReceivedFromRemote(publish, local))
if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
local.success(Consumer.ForwardPublish)
clientConnected(data)
case (context, prfr @ PublishReceivedFromRemote(publish @ Publish(_, topicName, Some(packetId), _), local)) =>
data.activeConsumers.get(topicName) match {
case None =>
val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
val consumer =
context.spawn(Consumer(publish,
Some(data.connect.clientId),
packetId,
local,
data.consumerPacketRouter,
data.settings),
consumerName)
context.watch(consumer)
clientConnected(data.copy(activeConsumers = data.activeConsumers + (publish.topicName -> consumer)))
case Some(consumer) if publish.flags.contains(ControlPacketFlags.DUP) =>
consumer ! Consumer.DupPublishReceivedFromRemote(local)
clientConnected(data)
case Some(_) =>
clientConnected(
data.copy(pendingRemotePublications = data.pendingRemotePublications :+ (publish.topicName -> prfr)))
}
case (context, ConsumerFree(topicName)) =>
val i = data.pendingRemotePublications.indexWhere(_._1 == topicName)
if (i >= 0) {
val prfr = data.pendingRemotePublications(i)._2
val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
val consumer = context.spawn(
Consumer(prfr.publish,
Some(data.connect.clientId),
prfr.publish.packetId.get,
prfr.local,
data.consumerPacketRouter,
data.settings),
consumerName)
context.watch(consumer)
clientConnected(
data.copy(
activeConsumers = data.activeConsumers + (topicName -> consumer),
pendingRemotePublications =
data.pendingRemotePublications.take(i) ++ data.pendingRemotePublications.drop(i + 1)))
} else {
clientConnected(data.copy(activeConsumers = data.activeConsumers - topicName))
}
case (context, PublishReceivedLocally(publish, _))
if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 &&
data.publishers.exists(Topics.filter(_, publish.topicName)) =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote
.offer(ForwardPublish(publish, None)),
result => QueueOfferCompleted(result.toEither),
clientConnected(data),
stash = Vector.empty)
case (context, prl @ PublishReceivedLocally(publish, publishData))
if data.publishers.exists(Topics.filter(_, publish.topicName)) =>
val producerName = ActorName.mkName(ProducerNamePrefix + publish.topicName + "-" + context.children.size)
if (!data.activeProducers.contains(publish.topicName)) {
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
import context.executionContext
reply.future.foreach {
_.runForeach(command => context.self ! ReceivedProducerPublishingCommand(command))
}
val producer =
context.spawn(Producer(publish, publishData, reply, data.producerPacketRouter, data.settings),
producerName)
context.watch(producer)
clientConnected(data.copy(activeProducers = data.activeProducers + (publish.topicName -> producer)))
} else {
clientConnected(
data.copy(pendingLocalPublications = data.pendingLocalPublications :+ (publish.topicName -> prl)))
}
case (context, ProducerFree(topicName)) =>
val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
if (i >= 0) {
val prl = data.pendingLocalPublications(i)._2
val producerName = ActorName.mkName(ProducerNamePrefix + topicName + "-" + context.children.size)
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
import context.executionContext
reply.future.foreach {
_.runForeach(command => context.self ! ReceivedProducerPublishingCommand(command))
}
val producer = context.spawn(
Producer(prl.publish, prl.publishData, reply, data.producerPacketRouter, data.settings),
producerName)
context.watch(producer)
clientConnected(
data.copy(
activeProducers = data.activeProducers + (topicName -> producer),
pendingLocalPublications =
data.pendingLocalPublications.take(i) ++ data.pendingLocalPublications.drop(i + 1)))
} else {
clientConnected(data.copy(activeProducers = data.activeProducers - topicName))
}
case (context, ReceivedProducerPublishingCommand(command)) =>
val eventualResult = command match {
case Producer.ForwardPublish(publish, packetId) =>
data.remote
.offer(ForwardPublish(publish, packetId))
case Producer.ForwardPubRel(_, packetId) =>
data.remote
.offer(ForwardPubRel(packetId))
}
QueueOfferState.waitForQueueOfferCompleted(
eventualResult,
result => QueueOfferCompleted(result.toEither),
clientConnected(data),
stash = Vector.empty)
case (context, PingReqReceivedFromRemote(local)) =>
local.success(ForwardPingReq)
QueueOfferState.waitForQueueOfferCompleted(
data.remote
.offer(ForwardPingResp),
result => QueueOfferCompleted(result.toEither),
clientConnected(data),
stash = Vector.empty)
case (context, ReceivePingReqTimeout) =>
data.remote.fail(ServerConnector.PingFailed)
timer.cancel(ReceivePingreq)
disconnect(context, data.remote, data)
case (context, DisconnectReceivedFromRemote(local)) =>
local.success(ForwardDisconnect)
timer.cancel(ReceivePingreq)
disconnect(context, data.remote, data)
case (context, ClientConnection.ConnectionLost) =>
timer.cancel(ReceivePingreq)
disconnect(context, data.remote, data)
case (context, ConnectReceivedFromRemote(connect, local))
if connect.connectFlags.contains(ConnectFlags.CleanSession) =>
context.children.foreach(context.stop)
timer.cancel(ReceivePingreq)
data.remote.complete()
clientConnect(
ConnectReceived(
connect,
local,
Vector.empty,
Set.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
data.consumerPacketRouter,
data.producerPacketRouter,
data.publisherPacketRouter,
data.unpublisherPacketRouter,
data.settings))
case (_, ConnectReceivedFromRemote(connect, local)) =>
timer.cancel(ReceivePingreq)
data.remote.complete()
clientConnect(
ConnectReceived(
connect,
local,
Vector.empty,
data.publishers,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.publisherPacketRouter,
data.unpublisherPacketRouter,
data.settings))
}
.receiveSignal {
case (context, ChildFailed(_, failure))
if failure == Subscriber.SubscribeFailed ||
failure == Unsubscriber.UnsubscribeFailed =>
data.remote.fail(failure)
disconnect(context, data.remote, data)
case (context, t: Terminated) =>
data.activeConsumers.find(_._2 == t.ref) match {
case Some((topic, _)) =>
context.self ! ConsumerFree(topic)
case None =>
data.activeProducers.find(_._2 == t.ref) match {
case Some((topic, _)) =>
context.self ! ProducerFree(topic)
case None =>
}
}
Behaviors.same
case (_, PostStop) =>
data.remote.complete()
Behaviors.same
}
}