in Cthulhu/src/Context.cpp [113:187]
MultiSubscriber Context::subscribeGeneric(
const std::vector<StreamID>& streamIDs,
const std::function<void(const std::vector<StreamSample>&)>& sampleCallback,
const std::function<bool(const std::vector<StreamConfig>&)>& configCallback,
const AlignerSamplesMetaCallback& samplesMetaCallback,
const AlignerConfigsMetaCallback& configsMetaCallback,
MultiSubscriberOptions options) const {
// Apply namespace to all streamIDs
std::vector<StreamID> streamIDs_ns;
streamIDs_ns.reserve(streamIDs.size());
for (const auto& id : streamIDs) {
streamIDs_ns.emplace_back(applyNamespace(id));
}
// Ensure that all streamIDs exist already, and that they're all non-basic. Streams must exist
// since they cannot be created without type information, and they must all be non-basic to allow
// configurations to get propagated correctly with the default aligner.
std::vector<StreamInterface*> streams;
streams.reserve(streamIDs_ns.size());
for (const auto& streamID : streamIDs_ns) {
auto* stream = Framework::instance().streamRegistry()->getStream(streamID);
// Ensure stream exists
if (stream == nullptr) {
// Choose to return an inactive MultiSubscriber here rather than throw an exception, since
// this is a user error and not an error with Cthulhu.
XR_LOGCW(
"Cthulhu",
"{}",
"Attempted to register generic multi subscriber without topic {} existing already.",
streamID);
// Need to create a vector of StreamIDViews since there's no StreamID vector constructor.
std::vector<StreamIDView> streamIDs_view;
streamIDs_view.reserve(streamIDs.size());
for (const auto& id : streamIDs) {
streamIDs_view.emplace_back(id);
}
return MultiSubscriber(streamIDs_view);
}
const auto typeID = stream->description().type();
const auto typeInfo = Framework::instance().typeRegistry()->findTypeID(typeID);
if (typeInfo->isBasic() && (configCallback != nullptr || configsMetaCallback != nullptr)) {
// This is still user error, but an exception is thrown here to maintain consistency with the
// config callback checks in the rest of the Cthulhu codebase.
auto str = "Found a basic stream when given config callback";
XR_LOGCE("Cthulhu", "{}", str);
throw std::runtime_error(str);
}
streams.push_back(stream);
}
// Hook up the aligner using the options provided by the user.
auto aligner = details::alignerFromOptions(options.alignerType, std::move(options.alignerPtr));
aligner->setCallback(sampleCallback);
aligner->setConfigCallback(configCallback);
aligner->setSamplesMetaCallback(samplesMetaCallback);
aligner->setConfigsMetaCallback(configsMetaCallback);
std::vector<StreamIDView> streamID_views;
streamID_views.reserve(streamIDs_ns.size());
for (size_t i = 0; i < streamIDs_ns.size(); ++i) {
aligner->registerConsumer(streams[i], i);
streamID_views.push_back(streams[i]->description().id());
}
aligner->finalize();
// Finally, register against the context registry and return a new multi subscriber.
if (ctx_ == nullptr) {
const auto err = "Attempted to register generic multi subscriber against null context";
XR_LOGCE("Cthulhu", "{}", err);
throw std::runtime_error(err);
}
ctx_->registerSubscriber(streamID_views);
return MultiSubscriber(streamID_views, std::move(aligner));
}