def serverConnected()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/ClientState.scala [393:590]


  def serverConnected(data: ConnAckReceived,
      resetPingReqTimer: Boolean = true)(implicit mat: Materializer): Behavior[Event] =
    Behaviors.withTimers { timer =>
      val SendPingreq = "send-pingreq"
      if (resetPingReqTimer && data.keepAlive.toMillis > 0)
        timer.startSingleTimer(SendPingreq, SendPingReqTimeout(data.connectionId), data.keepAlive)

      Behaviors
        .receivePartial[Event] {
          case (context, connect @ ConnectReceivedLocally(connectionId, _, _, _))
              if connectionId != data.connectionId =>
            context.self ! connect
            disconnect(context, data.remote, data)

          case (_, event) if event.connectionId.nonEmpty && event.connectionId != data.connectionId =>
            Behaviors.same

          case (context, ConnectionLost(_)) =>
            timer.cancel(SendPingreq)
            disconnect(context, data.remote, data)

          case (context, DisconnectReceivedLocally(_, remote)) =>
            remote.success(ForwardDisconnect)
            timer.cancel(SendPingreq)
            disconnect(context, data.remote, data)

          case (context, SubscribeReceivedLocally(_, _, subscribeData, remote)) =>
            context.watch(
              context.spawnAnonymous(Subscriber(subscribeData, remote, data.subscriberPacketRouter, data.settings)))
            serverConnected(data, resetPingReqTimer = true)

          case (context, UnsubscribeReceivedLocally(_, _, unsubscribeData, remote)) =>
            context.watch(
              context
                .spawnAnonymous(Unsubscriber(unsubscribeData, remote, data.unsubscriberPacketRouter, data.settings)))
            serverConnected(data, resetPingReqTimer = true)

          case (_, PublishReceivedFromRemote(_, publish, local))
              if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
            local.success(Consumer.ForwardPublish)
            serverConnected(data, resetPingReqTimer = false)

          case (context,
                prfr @ PublishReceivedFromRemote(_, publish @ Publish(_, topicName, Some(packetId), _), local)) =>
            data.activeConsumers.get(topicName) match {
              case None =>
                val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
                val consumer =
                  context.spawn(Consumer(publish, None, packetId, local, data.consumerPacketRouter, data.settings),
                    consumerName)
                context.watch(consumer)
                serverConnected(data.copy(activeConsumers = data.activeConsumers + (publish.topicName -> consumer)),
                  resetPingReqTimer = false)

              case Some(consumer) if publish.flags.contains(ControlPacketFlags.DUP) =>
                consumer ! Consumer.DupPublishReceivedFromRemote(local)
                serverConnected(data, resetPingReqTimer = false)

              case Some(_) =>
                serverConnected(
                  data.copy(pendingRemotePublications = data.pendingRemotePublications :+ (publish.topicName -> prfr)),
                  resetPingReqTimer = false)
            }

          case (context, ConsumerFree(topicName)) =>
            val i = data.pendingRemotePublications.indexWhere(_._1 == topicName)
            if (i >= 0) {
              val prfr = data.pendingRemotePublications(i)._2
              val consumerName = ActorName.mkName(ConsumerNamePrefix + topicName + "-" + context.children.size)
              val consumer = context.spawn(
                Consumer(prfr.publish,
                  None,
                  prfr.publish.packetId.get,
                  prfr.local,
                  data.consumerPacketRouter,
                  data.settings),
                consumerName)
              context.watch(consumer)
              serverConnected(
                data.copy(
                  activeConsumers = data.activeConsumers + (topicName -> consumer),
                  pendingRemotePublications =
                    data.pendingRemotePublications.take(i) ++ data.pendingRemotePublications.drop(i + 1)),
                resetPingReqTimer = true)
            } else {
              serverConnected(data.copy(activeConsumers = data.activeConsumers - topicName), resetPingReqTimer = true)
            }

          case (context, PublishReceivedLocally(publish, _))
              if (publish.flags & ControlPacketFlags.QoSReserved).underlying == 0 =>
            QueueOfferState.waitForQueueOfferCompleted(
              data.remote.offer(ForwardPublish(publish, None)),
              result => QueueOfferCompleted(ByteString.empty, result.toEither),
              serverConnected(data, resetPingReqTimer = true),
              stash = Vector.empty)

          case (context, prl @ PublishReceivedLocally(publish, publishData)) =>
            val producerName = ActorName.mkName(ProducerNamePrefix + publish.topicName + "-" + context.children.size)
            if (!data.activeProducers.contains(publish.topicName)) {
              val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()

              Source
                .futureSource(reply.future)
                .runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))

              val producer =
                context.spawn(Producer(publish, publishData, reply, data.producerPacketRouter, data.settings),
                  producerName)
              context.watch(producer)
              serverConnected(data.copy(activeProducers = data.activeProducers + (publish.topicName -> producer)),
                resetPingReqTimer = true)
            } else {
              serverConnected(
                data.copy(pendingLocalPublications = data.pendingLocalPublications :+ (publish.topicName -> prl)),
                resetPingReqTimer = true)
            }

          case (context, ProducerFree(topicName)) =>
            val i = data.pendingLocalPublications.indexWhere(_._1 == topicName)
            if (i >= 0) {
              val prl = data.pendingLocalPublications(i)._2
              val producerName = ActorName.mkName(ProducerNamePrefix + topicName + "-" + context.children.size)
              val reply = Promise[Source[Producer.ForwardPublishingCommand, NotUsed]]()

              Source
                .futureSource(reply.future)
                .runForeach(msg => context.self ! ReceivedProducerPublishingCommand(msg))

              val producer = context.spawn(
                Producer(prl.publish, prl.publishData, reply, data.producerPacketRouter, data.settings),
                producerName)
              context.watch(producer)
              serverConnected(
                data.copy(
                  activeProducers = data.activeProducers + (topicName -> producer),
                  pendingLocalPublications =
                    data.pendingLocalPublications.take(i) ++ data.pendingLocalPublications.drop(i + 1)),
                resetPingReqTimer = true)
            } else {
              serverConnected(data.copy(activeProducers = data.activeProducers - topicName), resetPingReqTimer = true)
            }

          case (context, ReceivedProducerPublishingCommand(Producer.ForwardPublish(publish, packetId))) =>
            QueueOfferState.waitForQueueOfferCompleted(
              data.remote
                .offer(ForwardPublish(publish, packetId)),
              result => QueueOfferCompleted(ByteString.empty, result.toEither),
              serverConnected(data, resetPingReqTimer = false),
              stash = Vector.empty)

          case (context, ReceivedProducerPublishingCommand(Producer.ForwardPubRel(_, packetId))) =>
            QueueOfferState.waitForQueueOfferCompleted(
              data.remote
                .offer(ForwardPubRel(packetId)),
              result => QueueOfferCompleted(ByteString.empty, result.toEither),
              serverConnected(data, resetPingReqTimer = false),
              stash = Vector.empty)

          case (context, SendPingReqTimeout(_)) if data.pendingPingResp =>
            data.remote.fail(PingFailed)
            timer.cancel(SendPingreq)
            disconnect(context, data.remote, data)

          case (context, SendPingReqTimeout(_)) =>
            QueueOfferState.waitForQueueOfferCompleted(
              data.remote
                .offer(ForwardPingReq),
              result => QueueOfferCompleted(ByteString.empty, result.toEither),
              serverConnected(data.copy(pendingPingResp = true), resetPingReqTimer = true),
              stash = Vector.empty)

          case (_, PingRespReceivedFromRemote(_, local)) =>
            local.success(ForwardPingResp)
            serverConnected(data.copy(pendingPingResp = false), resetPingReqTimer = true)
        }
        .receiveSignal {
          case (context, ChildFailed(_, failure))
              if failure == Subscriber.SubscribeFailed ||
              failure == Unsubscriber.UnsubscribeFailed =>
            data.remote.fail(failure)
            disconnect(context, data.remote, data)
          case (context, t: Terminated) =>
            data.activeConsumers.find(_._2 == t.ref) match {
              case Some((topic, _)) =>
                context.self ! ConsumerFree(topic)
              case None =>
                data.activeProducers.find(_._2 == t.ref) match {
                  case Some((topic, _)) =>
                    context.self ! ProducerFree(topic)
                  case None =>
                }
            }
            serverConnected(data, resetPingReqTimer = true)
          case (_, PostStop) =>
            data.remote.complete()
            Behaviors.same
        }
    }