in Cthulhu/include/cthulhu/ContextImpl.h [205:305]
Transformer Context::transform(
const StreamID& inputIDRaw,
const StreamID& outputIDRaw,
const std::function<void(const T&, U&)>& sampleCallback,
const std::function<bool(const W&, X&)>& configCallback,
TransformerOptions options) const {
StreamID inputID = applyNamespace(inputIDRaw);
StreamID outputID = applyNamespace(outputIDRaw);
static_assert(
std::is_constructible<T, const StreamSample&>::value,
"Context::transform requires that sample type T is constructed with const StreamSample&");
static_assert(
std::is_constructible<W, const StreamConfig&>::value,
"Context::transform requires that configuration type W is constructed with const StreamConfig&");
// Make sure the streams are valid
if ((!std::is_same<W, DefaultStreamConfig>::value &&
!Framework::instance().typeRegistry()->isValidStreamType(typeid(T), typeid(W))) ||
(!std::is_same<X, DefaultStreamConfig>::value &&
!Framework::instance().typeRegistry()->isValidStreamType(typeid(U), typeid(X)))) {
auto str = "Stream/Config Mismatch";
XR_LOGCW("Cthulhu", "{}", str);
throw std::runtime_error(str);
}
// Get Types
auto typeIn = sampleType<T>();
StreamDescription descIn{inputID, typeIn->typeID()};
auto siIn = Framework::instance().streamRegistry()->registerStream(descIn);
auto typeOut = sampleType<U>();
// Get Stream from Registry
StreamDescription descOut{outputID, typeOut->typeID()};
auto siOut = Framework::instance().streamRegistry()->registerStream(descOut);
if (typeIn->typeID() != siIn->description().type() ||
typeOut->typeID() != siOut->description().type()) {
// Type mismatch detected
XR_LOGCW(
"Cthulhu",
"Type mismatch detected [{}, {}] [{}, {}]",
typeIn,
siIn->description().type(),
typeOut,
siOut->description().type());
return Transformer(siIn->description().id(), siOut->description().id());
}
// Create Producer
std::unique_ptr<StreamProducer> producer(
new StreamProducer(siOut, options.producerType == ProducerType::ASYNC));
// Create Callbacks
auto scallback = [sampleCallback,
producer = producer.get(),
&inID = siIn->description().id(),
&outID = siOut->description().id()](const StreamSample& in) -> void {
const T inData(in);
if (!producer->config()) {
XR_LOGCW("Cthulhu", "Transformer callback not executing, output stream not configured.");
return;
}
U outData = details::allocateSampleHelper<U>(producer->config(), outID);
// TBD: What to do if callback needs to determine numSubSamples?
sampleCallback(inData, outData);
auto& out = outData.getSample();
out.metadata->history.emplace(inID, in.metadata);
producer->produceSample(out);
};
ConfigCallback ccallback = [configCallback,
producer = producer.get()](const StreamConfig& in) -> bool {
const W inData(in);
X outData;
bool success = configCallback(inData, outData);
if (!success) {
return false;
}
const StreamConfig& out = outData.getConfig();
producer->configureStream(out);
return true;
};
if (configCallback == nullptr) {
ccallback = nullptr;
}
// Create Consumer
std::unique_ptr<StreamConsumer> consumer(
new StreamConsumer(siIn, scallback, ccallback, options.consumerType == ConsumerType::ASYNC));
// Return Node
if (ctx_ == nullptr) {
const auto err = "Attempted to register single transformer against null context";
XR_LOGCE("Cthulhu", "{}", err);
throw std::runtime_error(err);
}
const auto& inId = siIn->description().id();
const auto& outId = siOut->description().id();
ctx_->registerTransformer(std::vector<StreamID>{inId}, std::vector<StreamID>{outId});
return Transformer(inId, outId, std::move(consumer), std::move(producer));
};