in labgraph/graphs/graph.py [0:0]
def _validate_topics(self) -> None:
"""
Validates all the graph's topics have publishers and subscribers. Raises an
error if any topic lacks a publisher or subscriber.
"""
stream_is_published = {
stream_id: False for stream_id in self.__streams__.keys()
}
stream_is_subscribed = {
stream_id: False for stream_id in self.__streams__.keys()
}
for publisher in self.publishers.values():
for stream_id in stream_is_published.keys():
stream_set = set(self.__streams__[stream_id].topic_paths)
if len(stream_set.intersection(publisher.published_topic_paths)) > 0:
stream_is_published[stream_id] = True
for subscriber in self.subscribers.values():
for stream_id in stream_is_subscribed.keys():
if (
subscriber.subscribed_topic_path
in self.__streams__[stream_id].topic_paths
):
stream_is_subscribed[stream_id] = True
unpublished_streams = {
stream_id
for stream_id, published in stream_is_published.items()
if not published
}
unsubscribed_streams = {
stream_id
for stream_id, subscribed in stream_is_subscribed.items()
if not subscribed
}
if len(unpublished_streams) > 0 or len(unsubscribed_streams) > 0:
message = f"{self.__class__.__name__} has unused topics:\n"
submessages = []
if len(unpublished_streams) > 0:
for stream_id in unpublished_streams:
for topic_path in self.__streams__[stream_id].topic_paths:
submessages.append(f"\t- {topic_path} has no publishers\n")
if len(unsubscribed_streams) > 0:
for stream_id in unsubscribed_streams:
for topic_path in self.__streams__[stream_id].topic_paths:
submessages.append(f"\t- {topic_path} has no subscribers\n")
message += "".join(sorted(submessages))
message += (
"This could mean that there are publishers and/or subscribers of "
"Cthulhu streams that Labgraph doesn't know about, and/or that data "
"in some topics is being discarded.\n"
)
# TODO: We warn instead of raising an error because Labgraph currently
# tries to run any publishers/subscribers it knows about as async functions,
# so for now we keep it ignorant of C++ publisher/subcriber methods.
logger.warning(message.strip())