def clientConnected()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ServerState.scala [467:701]


  def clientConnected(data: ConnAckReplied)(implicit mat: Materializer): Behavior[Event] = Behaviors.withTimers {
    timer =>
      val ReceivePingreq = "receive-pingreq"
      if (data.connect.keepAlive.toMillis > 0)
        timer.startSingleTimer(ReceivePingreq,
          ReceivePingReqTimeout,
          FiniteDuration((data.connect.keepAlive.toMillis * 1.5).toLong, TimeUnit.MILLISECONDS))

      Behaviors
        .receivePartial[Event] {
          case (context, SubscribeReceivedFromRemote(subscribe, local)) =>
            val subscribed = Promise[Done]()
            context.watch(
              context.spawnAnonymous(
                Publisher(data.connect.clientId,
                  subscribe.packetId,
                  local,
                  subscribed,
                  data.publisherPacketRouter,
                  data.settings)))
            subscribed.future.foreach(_ => context.self ! Subscribed(subscribe))(context.executionContext)
            clientConnected(data)
          case (_, Subscribed(subscribe)) =>
            clientConnected(
              data.copy(
                publishers = data.publishers ++ subscribe.topicFilters.map(_._1)))
          case (context, UnsubscribeReceivedFromRemote(unsubscribe, local)) =>
            val unsubscribed = Promise[Done]()
            context.watch(
              context.spawnAnonymous(
                Unpublisher(data.connect.clientId,
                  unsubscribe.packetId,
                  local,
                  unsubscribed,
                  data.unpublisherPacketRouter,
                  data.settings)))
            unsubscribed.future.foreach(_ => context.self ! Unsubscribed(unsubscribe))(context.executionContext)
            clientConnected(data)
          case (_, Unsubscribed(unsubscribe)) =>
            clientConnected(data.copy(publishers = data.publishers -- unsubscribe.topicFilters))
          case (_, PublishReceivedFromRemote(publish, local))
              if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
            local.success(Consumer.ForwardPublish)
            clientConnected(data)
          case (context, prfr @ PublishReceivedFromRemote(publish @ Publish(_, topicName, Some(packetId), _), local)) =>
            data.activeConsumers.get(topicName) match {
              case None =>
                val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
                val consumer =
                  context.spawn(Consumer(publish,
                    Some(data.connect.clientId),
                    packetId,
                    local,
                    data.consumerPacketRouter,
                    data.settings),
                    consumerName)
                context.watch(consumer)
                clientConnected(data.copy(activeConsumers = data.activeConsumers + (publish.topicName -> consumer)))
              case Some(consumer) if publish.flags.contains(ControlPacketFlags.DUP) =>
                consumer ! Consumer.DupPublishReceivedFromRemote(local)
                clientConnected(data)
              case Some(_) =>
                clientConnected(
                  data.copy(pendingRemotePublications = data.pendingRemotePublications :+ (publish.topicName -> prfr)))
            }
          case (context, ConsumerFree(topicName)) =>
            val i = data.pendingRemotePublications.indexWhere(_._1 == topicName)
            if (i >= 0) {
              val prfr = data.pendingRemotePublications(i)._2
              val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
              val consumer = context.spawn(
                Consumer(prfr.publish,
                  Some(data.connect.clientId),
                  prfr.publish.packetId.get,
                  prfr.local,
                  data.consumerPacketRouter,
                  data.settings),
                consumerName)
              context.watch(consumer)
              clientConnected(
                data.copy(
                  activeConsumers = data.activeConsumers + (topicName -> consumer),
                  pendingRemotePublications =
                    data.pendingRemotePublications.take(i) ++ data.pendingRemotePublications.drop(i + 1)))
            } else {
              clientConnected(data.copy(activeConsumers = data.activeConsumers - topicName))
            }
          case (context, PublishReceivedLocally(publish, _))
              if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 &&
              data.publishers.exists(Topics.filter(_, publish.topicName)) =>
            QueueOfferState.waitForQueueOfferCompleted(
              data.remote
                .offer(ForwardPublish(publish, None)),
              result => QueueOfferCompleted(result.toEither),
              clientConnected(data),
              stash = Vector.empty)

          case (context, prl @ PublishReceivedLocally(publish, publishData))
              if data.publishers.exists(Topics.filter(_, publish.topicName)) =>
            val producerName = ActorName.mkName(ProducerNamePrefix + publish.topicName + "-" + context.children.size)
            if (!data.activeProducers.contains(publish.topicName)) {
              val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
              import context.executionContext
              reply.future.foreach {
                _.runForeach(command => context.self ! ReceivedProducerPublishingCommand(command))
              }
              val producer =
                context.spawn(Producer(publish, publishData, reply, data.producerPacketRouter, data.settings),
                  producerName)
              context.watch(producer)
              clientConnected(data.copy(activeProducers = data.activeProducers + (publish.topicName -> producer)))
            } else {
              clientConnected(
                data.copy(pendingLocalPublications = data.pendingLocalPublications :+ (publish.topicName -> prl)))
            }
          case (context, ProducerFree(topicName)) =>
            val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
            if (i >= 0) {
              val prl = data.pendingLocalPublications(i)._2
              val producerName = ActorName.mkName(ProducerNamePrefix + topicName + "-" + context.children.size)
              val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()
              import context.executionContext
              reply.future.foreach {
                _.runForeach(command => context.self ! ReceivedProducerPublishingCommand(command))
              }
              val producer = context.spawn(
                Producer(prl.publish, prl.publishData, reply, data.producerPacketRouter, data.settings),
                producerName)
              context.watch(producer)
              clientConnected(
                data.copy(
                  activeProducers = data.activeProducers + (topicName -> producer),
                  pendingLocalPublications =
                    data.pendingLocalPublications.take(i) ++ data.pendingLocalPublications.drop(i + 1)))
            } else {
              clientConnected(data.copy(activeProducers = data.activeProducers - topicName))
            }
          case (context, ReceivedProducerPublishingCommand(command)) =>
            val eventualResult = command match {
              case Producer.ForwardPublish(publish, packetId) =>
                data.remote
                  .offer(ForwardPublish(publish, packetId))
              case Producer.ForwardPubRel(_, packetId) =>
                data.remote
                  .offer(ForwardPubRel(packetId))
            }

            QueueOfferState.waitForQueueOfferCompleted(
              eventualResult,
              result => QueueOfferCompleted(result.toEither),
              clientConnected(data),
              stash = Vector.empty)
          case (context, PingReqReceivedFromRemote(local)) =>
            local.success(ForwardPingReq)

            QueueOfferState.waitForQueueOfferCompleted(
              data.remote
                .offer(ForwardPingResp),
              result => QueueOfferCompleted(result.toEither),
              clientConnected(data),
              stash = Vector.empty)

          case (context, ReceivePingReqTimeout) =>
            data.remote.fail(ServerConnector.PingFailed)
            timer.cancel(ReceivePingreq)
            disconnect(context, data.remote, data)
          case (context, DisconnectReceivedFromRemote(local)) =>
            local.success(ForwardDisconnect)
            timer.cancel(ReceivePingreq)
            disconnect(context, data.remote, data)
          case (context, ClientConnection.ConnectionLost) =>
            timer.cancel(ReceivePingreq)
            disconnect(context, data.remote, data)
          case (context, ConnectReceivedFromRemote(connect, local))
              if connect.connectFlags.contains(ConnectFlags.CleanSession) =>
            context.children.foreach(context.stop)
            timer.cancel(ReceivePingreq)
            data.remote.complete()
            clientConnect(
              ConnectReceived(
                connect,
                local,
                Vector.empty,
                Set.empty,
                Map.empty,
                Map.empty,
                Vector.empty,
                Vector.empty,
                data.consumerPacketRouter,
                data.producerPacketRouter,
                data.publisherPacketRouter,
                data.unpublisherPacketRouter,
                data.settings))
          case (_, ConnectReceivedFromRemote(connect, local)) =>
            timer.cancel(ReceivePingreq)
            data.remote.complete()
            clientConnect(
              ConnectReceived(
                connect,
                local,
                Vector.empty,
                data.publishers,
                data.activeConsumers,
                data.activeProducers,
                data.pendingLocalPublications,
                data.pendingRemotePublications,
                data.consumerPacketRouter,
                data.producerPacketRouter,
                data.publisherPacketRouter,
                data.unpublisherPacketRouter,
                data.settings))
        }
        .receiveSignal {
          case (context, ChildFailed(_, failure))
              if failure == Subscriber.SubscribeFailed ||
              failure == Unsubscriber.UnsubscribeFailed =>
            data.remote.fail(failure)
            disconnect(context, data.remote, data)
          case (context, t: Terminated) =>
            data.activeConsumers.find(_._2 == t.ref) match {
              case Some((topic, _)) =>
                context.self ! ConsumerFree(topic)
              case None =>
                data.activeProducers.find(_._2 == t.ref) match {
                  case Some((topic, _)) =>
                    context.self ! ProducerFree(topic)
                  case None =>
                }
            }
            Behaviors.same
          case (_, PostStop) =>
            data.remote.complete()
            Behaviors.same
        }
  }