void StreamIPCHybrid::sendSampleIPC()

in Cthulhu/src/StreamRegistryIPCHybrid.cpp [72:160]


void StreamIPCHybrid::sendSampleIPC(const StreamSample& sample) {
  if (!ipcProducer_) {
    return;
  }
  notifyMemoryPool();

  StreamSampleIPC ipcSample(shm_->get_segment_manager());

  bool lookupSuccess = false;

  switch (sample.payload.type) {
    case (BufferType::CPU): {
      auto result = memoryPool_->convert(std::get<CpuBuffer>(sample.payload.data));
      ipcSample.payload = result;
      ipcSample.payloadType = BufferType::CPU;
      lookupSuccess = result;
      break;
    }
    case (BufferType::GPU): {
      auto result = memoryPool_->convert(std::get<GpuBuffer>(sample.payload.data));
      ipcSample.payload = result;
      ipcSample.payloadType = BufferType::GPU;
      lookupSuccess = result;
      break;
    }
    default: {
      ipcSample.payloadType = BufferType::NULL_BUFFER;
      break;
    }
  }

  if (sample.payload && !lookupSuccess &&
      !Framework::instance().typeRegistry()->findTypeID(description_.type())->isBasic()) {
    if (ipcActive_) {
      XR_LOGW(
          "StreamIPCHybrid - Failed to lookup shared memory pointer for payload of stream '{}'",
          description_.id());
    }
    return;
  }
  ipcSample.timestamp = sample.metadata->header.timestamp;
  ipcSample.sequenceNumber = sample.metadata->header.sequenceNumber;
  ipcSample.numberOfSubSamples = sample.numberOfSubSamples;
  for (const auto& processingStamp : sample.metadata->processingStamps) {
    ProcessingStampKey key(shm_->get_segment_manager());
    key = processingStamp.first.c_str();
    ipcSample.processingStamps[key] = processingStamp.second;
  }
  if (sample.parameters) {
    auto sharedParametersPtr = memoryPool_->convert(sample.parameters);
    if (sharedParametersPtr) {
      ipcSample.parameters = sharedParametersPtr;
    } else {
      XR_LOGW_ONCE(
          "StreamIPCHybrid - Failed to lookup shared memory pointer when sending parameters of stream '{}'",
          description_.id());
      ipcSample.parameters = memoryPool_->getBufferFromSharedPoolDirect(sampleParameterSize_);
      memcpy(ipcSample.parameters.get().get(), sample.parameters.get(), sampleParameterSize_);
    }
  }

  if (sample.dynamicParameters) {
    ipcSample.dynamicSampleParameters =
        DynamicFields(sampleDynamicFieldCount_, shm_->get_segment_manager());
    for (size_t idx = 0; idx < ipcSample.dynamicSampleParameters.size(); ++idx) {
      auto& rawDynamicIPC = ipcSample.dynamicSampleParameters[idx];
      const auto& rawDynamic = *(sample.dynamicParameters.get() + idx);
      rawDynamicIPC.elementCount = rawDynamic.elementCount;
      rawDynamicIPC.elementSize = rawDynamic.elementSize;

      auto sharedDynamicParameterPtr = memoryPool_->convert(rawDynamic.raw);
      if (sharedDynamicParameterPtr) {
        rawDynamicIPC.raw = sharedDynamicParameterPtr;
      } else {
        XR_LOGW_ONCE(
            "StreamIPCHybrid - Failed to lookup shared memory pointer when sending dynamic parameter {} of stream '{}'",
            idx,
            description_.id());
        rawDynamicIPC.raw = memoryPool_->getBufferFromSharedPoolDirect(
            rawDynamicIPC.elementCount * rawDynamicIPC.elementSize);
        std::memcpy(
            rawDynamicIPC.raw.get().get(),
            rawDynamic.raw.get(),
            rawDynamicIPC.elementCount * rawDynamicIPC.elementSize);
      }
    }
  }
  ipcProducer_->publish(ipcSample);
}