in mqtt-streaming/src/main/scala/org/apache/pekko/stream/connectors/mqtt/streaming/impl/RequestState.scala [568:626]
def main(registrantsByPacketId: Map[(Option[String], PacketId), Registration[A]],
clientIdsByConnectionId: Map[ByteString, String]): Behavior[Request[A]] =
Behaviors.receive {
case (context, Register(registrant: ActorRef[A], clientId, packetId, reply)) =>
reply.success(Registered)
context.watchWith(registrant, Unregister(clientId, packetId))
val key = (clientId, packetId)
main(registrantsByPacketId + (key -> Registration(registrant, List.empty)), clientIdsByConnectionId)
case (_, RegisterConnection(connectionId, clientId)) =>
main(registrantsByPacketId, clientIdsByConnectionId + (connectionId -> clientId))
case (_, Unregister(clientId, packetId)) =>
// We tidy up and fail any failure promises that haven't already been failed -
// just in case the registrant terminated abnormally and didn't get to complete
// the promise. We all know that uncompleted promises can lead to memory leaks.
// The known condition by which we'd succeed in failing the promise here is
// when we thought we were able to route to a registrant, but the routing
// subsequently failed, ending up the in the deadletter queue.
registrantsByPacketId.get((clientId, packetId)).toList.flatMap(_.failureReplies).foreach { failureReply =>
failureReply.tryFailure(CannotRoute(packetId))
}
val key = (clientId, packetId)
main(registrantsByPacketId - key, clientIdsByConnectionId)
case (_, UnregisterConnection(connectionId)) =>
main(registrantsByPacketId, clientIdsByConnectionId - connectionId)
case (_, Route(clientId, packetId, event, failureReply)) =>
val key = (clientId, packetId)
registrantsByPacketId.get(key) match {
case Some(registration) =>
registration.registrant ! event
main(
registrantsByPacketId.updated(
(clientId, packetId),
registration.copy(failureReplies = failureReply +: registration.failureReplies)),
clientIdsByConnectionId)
case None =>
failureReply.failure(CannotRoute(packetId))
Behaviors.same
}
case (_, RouteViaConnection(connectionId, packetId, event, failureReply)) =>
clientIdsByConnectionId.get(connectionId) match {
case clientId: Some[String] =>
val key = (clientId, packetId)
registrantsByPacketId.get(key) match {
case Some(registration) =>
registration.registrant ! event
main(
registrantsByPacketId.updated(
(clientId, packetId),
registration.copy(failureReplies = failureReply +: registration.failureReplies)),
clientIdsByConnectionId)
case None =>
failureReply.failure(CannotRoute(packetId))
Behaviors.same
}
case None =>
failureReply.failure(CannotRoute(packetId))
Behaviors.same
}
}