MultiSubscriber Context::subscribeGeneric()

in Cthulhu/src/Context.cpp [113:187]


MultiSubscriber Context::subscribeGeneric(
    const std::vector<StreamID>& streamIDs,
    const std::function<void(const std::vector<StreamSample>&)>& sampleCallback,
    const std::function<bool(const std::vector<StreamConfig>&)>& configCallback,
    const AlignerSamplesMetaCallback& samplesMetaCallback,
    const AlignerConfigsMetaCallback& configsMetaCallback,
    MultiSubscriberOptions options) const {
  // Apply namespace to all streamIDs
  std::vector<StreamID> streamIDs_ns;
  streamIDs_ns.reserve(streamIDs.size());
  for (const auto& id : streamIDs) {
    streamIDs_ns.emplace_back(applyNamespace(id));
  }

  // Ensure that all streamIDs exist already, and that they're all non-basic. Streams must exist
  // since they cannot be created without type information, and they must all be non-basic to allow
  // configurations to get propagated correctly with the default aligner.
  std::vector<StreamInterface*> streams;
  streams.reserve(streamIDs_ns.size());
  for (const auto& streamID : streamIDs_ns) {
    auto* stream = Framework::instance().streamRegistry()->getStream(streamID);
    // Ensure stream exists
    if (stream == nullptr) {
      // Choose to return an inactive MultiSubscriber here rather than throw an exception, since
      // this is a user error and not an error with Cthulhu.
      XR_LOGCW(
          "Cthulhu",
          "{}",
          "Attempted to register generic multi subscriber without topic {} existing already.",
          streamID);
      // Need to create a vector of StreamIDViews since there's no StreamID vector constructor.
      std::vector<StreamIDView> streamIDs_view;
      streamIDs_view.reserve(streamIDs.size());
      for (const auto& id : streamIDs) {
        streamIDs_view.emplace_back(id);
      }
      return MultiSubscriber(streamIDs_view);
    }

    const auto typeID = stream->description().type();
    const auto typeInfo = Framework::instance().typeRegistry()->findTypeID(typeID);
    if (typeInfo->isBasic() && (configCallback != nullptr || configsMetaCallback != nullptr)) {
      // This is still user error, but an exception is thrown here to maintain consistency with the
      // config callback checks in the rest of the Cthulhu codebase.
      auto str = "Found a basic stream when given config callback";
      XR_LOGCE("Cthulhu", "{}", str);
      throw std::runtime_error(str);
    }
    streams.push_back(stream);
  }

  // Hook up the aligner using the options provided by the user.
  auto aligner = details::alignerFromOptions(options.alignerType, std::move(options.alignerPtr));
  aligner->setCallback(sampleCallback);
  aligner->setConfigCallback(configCallback);
  aligner->setSamplesMetaCallback(samplesMetaCallback);
  aligner->setConfigsMetaCallback(configsMetaCallback);

  std::vector<StreamIDView> streamID_views;
  streamID_views.reserve(streamIDs_ns.size());
  for (size_t i = 0; i < streamIDs_ns.size(); ++i) {
    aligner->registerConsumer(streams[i], i);
    streamID_views.push_back(streams[i]->description().id());
  }
  aligner->finalize();

  // Finally, register against the context registry and return a new multi subscriber.
  if (ctx_ == nullptr) {
    const auto err = "Attempted to register generic multi subscriber against null context";
    XR_LOGCE("Cthulhu", "{}", err);
    throw std::runtime_error(err);
  }
  ctx_->registerSubscriber(streamID_views);
  return MultiSubscriber(streamID_views, std::move(aligner));
}