def main()

in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [568:626]


  def main(registrantsByPacketId: Map[(Option[String], PacketId), Registration[A]],
      clientIdsByConnectionId: Map[ByteString, String]): Behavior[Request[A]] =
    Behaviors.receive {
      case (context, Register(registrant: ActorRef[A], clientId, packetId, reply)) =>
        reply.success(Registered)
        context.watchWith(registrant, Unregister(clientId, packetId))
        val key = (clientId, packetId)
        main(registrantsByPacketId + (key -> Registration(registrant, List.empty)), clientIdsByConnectionId)
      case (_, RegisterConnection(connectionId, clientId)) =>
        main(registrantsByPacketId, clientIdsByConnectionId + (connectionId -> clientId))
      case (_, Unregister(clientId, packetId)) =>
        // We tidy up and fail any failure promises that haven't already been failed -
        // just in case the registrant terminated abnormally and didn't get to complete
        // the promise. We all know that uncompleted promises can lead to memory leaks.
        // The known condition by which we'd succeed in failing the promise here is
        // when we thought we were able to route to a registrant, but the routing
        // subsequently failed, ending up the in the deadletter queue.
        registrantsByPacketId.get((clientId, packetId)).toList.flatMap(_.failureReplies).foreach { failureReply =>
          failureReply.tryFailure(CannotRoute(packetId))
        }
        val key = (clientId, packetId)
        main(registrantsByPacketId - key, clientIdsByConnectionId)
      case (_, UnregisterConnection(connectionId)) =>
        main(registrantsByPacketId, clientIdsByConnectionId - connectionId)
      case (_, Route(clientId, packetId, event, failureReply)) =>
        val key = (clientId, packetId)
        registrantsByPacketId.get(key) match {
          case Some(registration) =>
            registration.registrant ! event
            main(
              registrantsByPacketId.updated(
                (clientId, packetId),
                registration.copy(failureReplies = failureReply +: registration.failureReplies)),
              clientIdsByConnectionId)
          case None =>
            failureReply.failure(CannotRoute(packetId))
            Behaviors.same
        }
      case (_, RouteViaConnection(connectionId, packetId, event, failureReply)) =>
        clientIdsByConnectionId.get(connectionId) match {
          case clientId: Some[String] =>
            val key = (clientId, packetId)
            registrantsByPacketId.get(key) match {
              case Some(registration) =>
                registration.registrant ! event
                main(
                  registrantsByPacketId.updated(
                    (clientId, packetId),
                    registration.copy(failureReplies = failureReply +: registration.failureReplies)),
                  clientIdsByConnectionId)
              case None =>
                failureReply.failure(CannotRoute(packetId))
                Behaviors.same
            }
          case None =>
            failureReply.failure(CannotRoute(packetId))
            Behaviors.same
        }
    }