void Node::bootstrapStream()

in labgraph/cpp/Node.cpp [36:94]


void Node::bootstrapStream(const std::string& topic, const cthulhu::StreamID& streamID) {
  auto topics = getTopics();
  if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
    throw std::runtime_error("C++ node bootstrapped with invalid topic '" + topic + "'");
  }
  if (streamIDsByTopic_.count(topic) != 0) {
    throw std::runtime_error("C++ node bootstrapped topic '" + topic + "' multiple times");
  }
  streamIDsByTopic_[topic] = streamID;

  auto labgraphPublishers = getPublishers();
  auto labgraphSubscribers = getSubscribers();
  auto labgraphTransformers = getTransformers();

  cthulhu::StreamInterface* si =
      cthulhu::Framework::instance().streamRegistry()->getStream(streamID);
  if (!si) {
    throw std::runtime_error(
        "C++ node bootstrapped topic '" + topic + "' with invalid stream ID '" + streamID + "'");
  }
  cthulhu::StreamDescription desc = si->description();

  for (const auto& publisher : labgraphPublishers) {
    auto topicsBegin = publisher.publishedTopics.begin();
    auto topicsEnd = publisher.publishedTopics.end();
    if (std::find(topicsBegin, topicsEnd, topic) != topicsEnd) {
      auto cthulhuType = cthulhu::Framework::instance().typeRegistry()->findTypeID(desc.type());
      cthulhuPublishersByTopic_[topic] =
          cthulhu::ptrWrap(context_.advertise(desc.id(), desc.type()));
      break;
    }
  }

  for (const auto& subscriber : labgraphSubscribers) {
    if (subscriber.subscribedTopic == topic) {
      cthulhuSubscribersByTopic_.insert(
          {topic,
           cthulhu::ptrWrap(context_.subscribeGeneric(
               desc.id(), subscriber.subscriber, nullptr, {cthulhu::ConsumerType::ASYNC}))});
      break;
    }
  }

  for (const auto& transformer : labgraphTransformers) {
    auto topicsBegin = transformer.publishedTopics.begin();
    auto topicsEnd = transformer.publishedTopics.end();
    if (std::find(topicsBegin, topicsEnd, topic) != topicsEnd &&
        cthulhuPublishersByTopic_.count(topic) == 0) {
      cthulhuPublishersByTopic_[topic] =
          cthulhu::ptrWrap(context_.advertise(desc.id(), desc.type()));
    }
    if (transformer.subscribedTopic == topic && cthulhuSubscribersByTopic_.count(topic) == 0) {
      cthulhuSubscribersByTopic_.insert(
          {topic,
           cthulhu::ptrWrap(context_.subscribeGeneric(
               desc.id(), transformer.transformer, nullptr, {cthulhu::ConsumerType::ASYNC}))});
    }
  }
}