in Cthulhu/include/cthulhu/ContextImpl.h [379:453]
MultiSubscriber Context::subscribe(
const std::vector<std::vector<StreamID>>& streamIDs,
const std::function<void(const T&...)>& callback,
const std::function<bool(const U&...)>& configCallback,
MultiSubscriberOptions options) const {
// Flatten StreamIDs
std::vector<StreamID> streamIDsFlat;
std::vector<unsigned long> groups(streamIDs.size());
for (int groupIdx = 0; groupIdx < streamIDs.size(); ++groupIdx) {
std::vector<StreamID> streamIDsNS(streamIDs[groupIdx].size());
for (int elemIdx = 0; elemIdx < streamIDs[groupIdx].size(); ++elemIdx) {
streamIDsNS[elemIdx] = applyNamespace(streamIDs[groupIdx][elemIdx]);
}
groups[groupIdx] = streamIDs[groupIdx].size();
streamIDsFlat.insert(streamIDsFlat.end(), streamIDsNS.begin(), streamIDsNS.end());
}
// Get Types
std::vector<uint32_t> types(streamIDsFlat.size(), 0);
details::SampleTypesDynamic<T...>::getTypes(groups, 0, 0, types);
std::vector<bool> basicStreams(streamIDsFlat.size());
for (size_t i = 0; i < basicStreams.size(); ++i) {
auto type = Framework::instance().typeRegistry()->findTypeID(types[i]);
if (!type) {
auto str = "Attempted to lookup unregistered typeID: " + std::to_string(types[i]);
XR_LOGCE("Cthulhu", "{}", str);
throw std::runtime_error(str);
}
basicStreams[i] = type->isBasic();
}
// Create Aligner
auto aligner = details::alignerFromOptions(options.alignerType, std::move(options.alignerPtr));
// Create Callbacks and Register in Aligner
std::function<void(const std::vector<StreamSample>&, const T&...)> callbackAppended =
[callback](const std::vector<StreamSample>&, const T&... args) -> void { callback(args...); };
AlignerSampleCallback cb = details::generateAlignerCallback<0>(0, groups, callbackAppended);
aligner->setCallback(cb);
if (configCallback != nullptr) {
std::function<bool(const std::vector<StreamConfig>&, const U&...)> configCallbackAppended =
[configCallback](const std::vector<StreamConfig>&, const U&... args) -> bool {
return configCallback(args...);
};
AlignerConfigCallback ccb =
details::generateAlignerConfigCallback<0>(0, groups, configCallbackAppended, basicStreams);
aligner->setConfigCallback(ccb);
}
// Get Streams from Registry and Load in Aligner
std::vector<StreamIDView> streamIDsVec;
streamIDsVec.reserve(streamIDsFlat.size());
for (unsigned long i = 0; i < streamIDsFlat.size(); i++) {
auto type = Framework::instance().typeRegistry()->findTypeID(types[i]);
StreamDescription desc{streamIDsFlat[i], types[i]};
auto si = Framework::instance().streamRegistry()->registerStream(desc);
if (types[i] != si->description().type()) {
// Type mismatch detected
XR_LOGCW("Cthulhu", "Type mismatch detected [{}, {}]", types[i], si->description().type());
return MultiSubscriber(streamIDsVec);
}
streamIDsVec.push_back(StreamIDView(si->description().id()));
aligner->registerConsumer(si, i);
}
aligner->finalize();
// Return Node
if (ctx_ == nullptr) {
const auto err = "Attempted to register multi subscriber against null context";
XR_LOGCE("Cthulhu", "{}", err);
throw std::runtime_error(err);
}
ctx_->registerSubscriber(streamIDsVec);
return MultiSubscriber(streamIDsVec, std::move(aligner));
};