Transformer Context::transform()

in Cthulhu/include/cthulhu/ContextImpl.h [205:305]


Transformer Context::transform(
    const StreamID& inputIDRaw,
    const StreamID& outputIDRaw,
    const std::function<void(const T&, U&)>& sampleCallback,
    const std::function<bool(const W&, X&)>& configCallback,
    TransformerOptions options) const {
  StreamID inputID = applyNamespace(inputIDRaw);
  StreamID outputID = applyNamespace(outputIDRaw);
  static_assert(
      std::is_constructible<T, const StreamSample&>::value,
      "Context::transform requires that sample type T is constructed with const StreamSample&");
  static_assert(
      std::is_constructible<W, const StreamConfig&>::value,
      "Context::transform requires that configuration type W is constructed with const StreamConfig&");

  // Make sure the streams are valid
  if ((!std::is_same<W, DefaultStreamConfig>::value &&
       !Framework::instance().typeRegistry()->isValidStreamType(typeid(T), typeid(W))) ||
      (!std::is_same<X, DefaultStreamConfig>::value &&
       !Framework::instance().typeRegistry()->isValidStreamType(typeid(U), typeid(X)))) {
    auto str = "Stream/Config Mismatch";
    XR_LOGCW("Cthulhu", "{}", str);
    throw std::runtime_error(str);
  }

  // Get Types
  auto typeIn = sampleType<T>();

  StreamDescription descIn{inputID, typeIn->typeID()};
  auto siIn = Framework::instance().streamRegistry()->registerStream(descIn);
  auto typeOut = sampleType<U>();

  // Get Stream from Registry
  StreamDescription descOut{outputID, typeOut->typeID()};
  auto siOut = Framework::instance().streamRegistry()->registerStream(descOut);
  if (typeIn->typeID() != siIn->description().type() ||
      typeOut->typeID() != siOut->description().type()) {
    // Type mismatch detected
    XR_LOGCW(
        "Cthulhu",
        "Type mismatch detected [{}, {}] [{}, {}]",
        typeIn,
        siIn->description().type(),
        typeOut,
        siOut->description().type());
    return Transformer(siIn->description().id(), siOut->description().id());
  }

  // Create Producer
  std::unique_ptr<StreamProducer> producer(
      new StreamProducer(siOut, options.producerType == ProducerType::ASYNC));

  // Create Callbacks
  auto scallback = [sampleCallback,
                    producer = producer.get(),
                    &inID = siIn->description().id(),
                    &outID = siOut->description().id()](const StreamSample& in) -> void {
    const T inData(in);
    if (!producer->config()) {
      XR_LOGCW("Cthulhu", "Transformer callback not executing, output stream not configured.");
      return;
    }
    U outData = details::allocateSampleHelper<U>(producer->config(), outID);
    // TBD: What to do if callback needs to determine numSubSamples?
    sampleCallback(inData, outData);

    auto& out = outData.getSample();
    out.metadata->history.emplace(inID, in.metadata);
    producer->produceSample(out);
  };
  ConfigCallback ccallback = [configCallback,
                              producer = producer.get()](const StreamConfig& in) -> bool {
    const W inData(in);
    X outData;
    bool success = configCallback(inData, outData);
    if (!success) {
      return false;
    }
    const StreamConfig& out = outData.getConfig();
    producer->configureStream(out);
    return true;
  };
  if (configCallback == nullptr) {
    ccallback = nullptr;
  }

  // Create Consumer
  std::unique_ptr<StreamConsumer> consumer(
      new StreamConsumer(siIn, scallback, ccallback, options.consumerType == ConsumerType::ASYNC));

  // Return Node
  if (ctx_ == nullptr) {
    const auto err = "Attempted to register single transformer against null context";
    XR_LOGCE("Cthulhu", "{}", err);
    throw std::runtime_error(err);
  }
  const auto& inId = siIn->description().id();
  const auto& outId = siOut->description().id();
  ctx_->registerTransformer(std::vector<StreamID>{inId}, std::vector<StreamID>{outId});
  return Transformer(inId, outId, std::move(consumer), std::move(producer));
};