def _validate_topics()

in labgraph/graphs/graph.py [0:0]


    def _validate_topics(self) -> None:
        """
        Validates all the graph's topics have publishers and subscribers. Raises an
        error if any topic lacks a publisher or subscriber.
        """
        stream_is_published = {
            stream_id: False for stream_id in self.__streams__.keys()
        }
        stream_is_subscribed = {
            stream_id: False for stream_id in self.__streams__.keys()
        }

        for publisher in self.publishers.values():
            for stream_id in stream_is_published.keys():
                stream_set = set(self.__streams__[stream_id].topic_paths)
                if len(stream_set.intersection(publisher.published_topic_paths)) > 0:
                    stream_is_published[stream_id] = True

        for subscriber in self.subscribers.values():
            for stream_id in stream_is_subscribed.keys():
                if (
                    subscriber.subscribed_topic_path
                    in self.__streams__[stream_id].topic_paths
                ):
                    stream_is_subscribed[stream_id] = True

        unpublished_streams = {
            stream_id
            for stream_id, published in stream_is_published.items()
            if not published
        }
        unsubscribed_streams = {
            stream_id
            for stream_id, subscribed in stream_is_subscribed.items()
            if not subscribed
        }

        if len(unpublished_streams) > 0 or len(unsubscribed_streams) > 0:
            message = f"{self.__class__.__name__} has unused topics:\n"
            submessages = []
            if len(unpublished_streams) > 0:
                for stream_id in unpublished_streams:
                    for topic_path in self.__streams__[stream_id].topic_paths:
                        submessages.append(f"\t- {topic_path} has no publishers\n")
            if len(unsubscribed_streams) > 0:
                for stream_id in unsubscribed_streams:
                    for topic_path in self.__streams__[stream_id].topic_paths:
                        submessages.append(f"\t- {topic_path} has no subscribers\n")
            message += "".join(sorted(submessages))
            message += (
                "This could mean that there are publishers and/or subscribers of "
                "Cthulhu streams that Labgraph doesn't know about, and/or that data "
                "in some topics is being discarded.\n"
            )

            # TODO: We warn instead of raising an error because Labgraph currently
            # tries to run any publishers/subscribers it knows about as async functions,
            # so for now we keep it ignorant of C++ publisher/subcriber methods.
            logger.warning(message.strip())