in labgraph/cpp/Node.cpp [36:94]
void Node::bootstrapStream(const std::string& topic, const cthulhu::StreamID& streamID) {
auto topics = getTopics();
if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
throw std::runtime_error("C++ node bootstrapped with invalid topic '" + topic + "'");
}
if (streamIDsByTopic_.count(topic) != 0) {
throw std::runtime_error("C++ node bootstrapped topic '" + topic + "' multiple times");
}
streamIDsByTopic_[topic] = streamID;
auto labgraphPublishers = getPublishers();
auto labgraphSubscribers = getSubscribers();
auto labgraphTransformers = getTransformers();
cthulhu::StreamInterface* si =
cthulhu::Framework::instance().streamRegistry()->getStream(streamID);
if (!si) {
throw std::runtime_error(
"C++ node bootstrapped topic '" + topic + "' with invalid stream ID '" + streamID + "'");
}
cthulhu::StreamDescription desc = si->description();
for (const auto& publisher : labgraphPublishers) {
auto topicsBegin = publisher.publishedTopics.begin();
auto topicsEnd = publisher.publishedTopics.end();
if (std::find(topicsBegin, topicsEnd, topic) != topicsEnd) {
auto cthulhuType = cthulhu::Framework::instance().typeRegistry()->findTypeID(desc.type());
cthulhuPublishersByTopic_[topic] =
cthulhu::ptrWrap(context_.advertise(desc.id(), desc.type()));
break;
}
}
for (const auto& subscriber : labgraphSubscribers) {
if (subscriber.subscribedTopic == topic) {
cthulhuSubscribersByTopic_.insert(
{topic,
cthulhu::ptrWrap(context_.subscribeGeneric(
desc.id(), subscriber.subscriber, nullptr, {cthulhu::ConsumerType::ASYNC}))});
break;
}
}
for (const auto& transformer : labgraphTransformers) {
auto topicsBegin = transformer.publishedTopics.begin();
auto topicsEnd = transformer.publishedTopics.end();
if (std::find(topicsBegin, topicsEnd, topic) != topicsEnd &&
cthulhuPublishersByTopic_.count(topic) == 0) {
cthulhuPublishersByTopic_[topic] =
cthulhu::ptrWrap(context_.advertise(desc.id(), desc.type()));
}
if (transformer.subscribedTopic == topic && cthulhuSubscribersByTopic_.count(topic) == 0) {
cthulhuSubscribersByTopic_.insert(
{topic,
cthulhu::ptrWrap(context_.subscribeGeneric(
desc.id(), transformer.transformer, nullptr, {cthulhu::ConsumerType::ASYNC}))});
}
}
}