in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [393:590]
def serverConnected(data: ConnAckReceived,
resetPingReqTimer: Boolean = true)(implicit mat: Materializer): Behavior[Event] =
Behaviors.withTimers { timer =>
val SendPingreq = "send-pingreq"
if (resetPingReqTimer && data.keepAlive.toMillis > 0)
timer.startSingleTimer(SendPingreq, SendPingReqTimeout(data.connectionId), data.keepAlive)
Behaviors
.receivePartial[Event] {
case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
if connectionId != data.connectionId =>
context.self ! connect
disconnect(context, data.remote, data)
case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
Behaviors.same
case (context, ConnectionLost(_)) =>
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (context, DisconnectReceivedLocally(_, remote)) =>
remote.success(ForwardDisconnect)
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (context, SubscribeReceivedLocally(_, _, subscribeData, remote)) =>
context.watch(
context.spawnAnonymous(Subscriber(subscribeData, remote, data.subscriberPacketRouter, data.settings)))
serverConnected(data, resetPingReqTimer = true)
case (context, UnsubscribeReceivedLocally(_, _, unsubscribeData, remote)) =>
context.watch(
context
.spawnAnonymous(Unsubscriber(unsubscribeData, remote, data.unsubscriberPacketRouter, data.settings)))
serverConnected(data, resetPingReqTimer = true)
case (_, PublishReceivedFromRemote(_, publish, local))
if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
local.success(Consumer.ForwardPublish)
serverConnected(data, resetPingReqTimer = false)
case (context,
prfr @ PublishReceivedFromRemote(_, publish @ Publish(_, topicName, Some(packetId), _), local)) =>
data.activeConsumers.get(topicName) match {
case None =>
val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
val consumer =
context.spawn(Consumer(publish, None, packetId, local, data.consumerPacketRouter, data.settings),
consumerName)
context.watch(consumer)
serverConnected(data.copy(activeConsumers = data.activeConsumers + (publish.topicName -> consumer)),
resetPingReqTimer = false)
case Some(consumer) if publish.flags.contains(ControlPacketFlags.DUP) =>
consumer ! Consumer.DupPublishReceivedFromRemote(local)
serverConnected(data, resetPingReqTimer = false)
case Some(_) =>
serverConnected(
data.copy(pendingRemotePublications = data.pendingRemotePublications :+ (publish.topicName -> prfr)),
resetPingReqTimer = false)
}
case (context, ConsumerFree(topicName)) =>
val i = data.pendingRemotePublications.indexWhere(_._1 == topicName)
if (i >= 0) {
val prfr = data.pendingRemotePublications(i)._2
val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
val consumer = context.spawn(
Consumer(prfr.publish,
None,
prfr.publish.packetId.get,
prfr.local,
data.consumerPacketRouter,
data.settings),
consumerName)
context.watch(consumer)
serverConnected(
data.copy(
activeConsumers = data.activeConsumers + (topicName -> consumer),
pendingRemotePublications =
data.pendingRemotePublications.take(i) ++ data.pendingRemotePublications.drop(i + 1)),
resetPingReqTimer = true)
} else {
serverConnected(data.copy(activeConsumers = data.activeConsumers - topicName), resetPingReqTimer = true)
}
case (context, PublishReceivedLocally(publish, _))
if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote.offer(ForwardPublish(publish, None)),
result => QueueOfferCompleted(ByteString.empty, result.toEither),
serverConnected(data, resetPingReqTimer = true),
stash = Vector.empty)
case (context, prl @ PublishReceivedLocally(publish, publishData)) =>
val producerName = ActorName.mkName(ProducerNamePrefix + publish.topicName + "-" + context.children.size)
if (!data.activeProducers.contains(publish.topicName)) {
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
Source
.futureSource(reply.future)
.runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))
val producer =
context.spawn(Producer(publish, publishData, reply, data.producerPacketRouter, data.settings),
producerName)
context.watch(producer)
serverConnected(data.copy(activeProducers = data.activeProducers + (publish.topicName -> producer)),
resetPingReqTimer = true)
} else {
serverConnected(
data.copy(pendingLocalPublications = data.pendingLocalPublications :+ (publish.topicName -> prl)),
resetPingReqTimer = true)
}
case (context, ProducerFree(topicName)) =>
val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
if (i >= 0) {
val prl = data.pendingLocalPublications(i)._2
val producerName = ActorName.mkName(ProducerNamePrefix + topicName + "-" + context.children.size)
val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
Source
.futureSource(reply.future)
.runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))
val producer = context.spawn(
Producer(prl.publish, prl.publishData, reply, data.producerPacketRouter, data.settings),
producerName)
context.watch(producer)
serverConnected(
data.copy(
activeProducers = data.activeProducers + (topicName -> producer),
pendingLocalPublications =
data.pendingLocalPublications.take(i) ++ data.pendingLocalPublications.drop(i + 1)),
resetPingReqTimer = true)
} else {
serverConnected(data.copy(activeProducers = data.activeProducers - topicName), resetPingReqTimer = true)
}
case (context, ReceivedProducerPublishingCommand(Producer.ForwardPublish(publish, packetId))) =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote
.offer(ForwardPublish(publish, packetId)),
result => QueueOfferCompleted(ByteString.empty, result.toEither),
serverConnected(data, resetPingReqTimer = false),
stash = Vector.empty)
case (context, ReceivedProducerPublishingCommand(Producer.ForwardPubRel(_, packetId))) =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote
.offer(ForwardPubRel(packetId)),
result => QueueOfferCompleted(ByteString.empty, result.toEither),
serverConnected(data, resetPingReqTimer = false),
stash = Vector.empty)
case (context, SendPingReqTimeout(_)) if data.pendingPingResp =>
data.remote.fail(PingFailed)
timer.cancel(SendPingreq)
disconnect(context, data.remote, data)
case (context, SendPingReqTimeout(_)) =>
QueueOfferState.waitForQueueOfferCompleted(
data.remote
.offer(ForwardPingReq),
result => QueueOfferCompleted(ByteString.empty, result.toEither),
serverConnected(data.copy(pendingPingResp = true), resetPingReqTimer = true),
stash = Vector.empty)
case (_, PingRespReceivedFromRemote(_, local)) =>
local.success(ForwardPingResp)
serverConnected(data.copy(pendingPingResp = false), resetPingReqTimer = true)
}
.receiveSignal {
case (context, ChildFailed(_, failure))
if failure == Subscriber.SubscribeFailed ||
failure == Unsubscriber.UnsubscribeFailed =>
data.remote.fail(failure)
disconnect(context, data.remote, data)
case (context, t: Terminated) =>
data.activeConsumers.find(_._2 == t.ref) match {
case Some((topic, _)) =>
context.self ! ConsumerFree(topic)
case None =>
data.activeProducers.find(_._2 == t.ref) match {
case Some((topic, _)) =>
context.self ! ProducerFree(topic)
case None =>
}
}
serverConnected(data, resetPingReqTimer = true)
case (_, PostStop) =>
data.remote.complete()
Behaviors.same
}
}