in Cthulhu/src/StreamRegistryIPCHybrid.cpp [72:160]
void StreamIPCHybrid::sendSampleIPC(const StreamSample& sample) {
if (!ipcProducer_) {
return;
}
notifyMemoryPool();
StreamSampleIPC ipcSample(shm_->get_segment_manager());
bool lookupSuccess = false;
switch (sample.payload.type) {
case (BufferType::CPU): {
auto result = memoryPool_->convert(std::get<CpuBuffer>(sample.payload.data));
ipcSample.payload = result;
ipcSample.payloadType = BufferType::CPU;
lookupSuccess = result;
break;
}
case (BufferType::GPU): {
auto result = memoryPool_->convert(std::get<GpuBuffer>(sample.payload.data));
ipcSample.payload = result;
ipcSample.payloadType = BufferType::GPU;
lookupSuccess = result;
break;
}
default: {
ipcSample.payloadType = BufferType::NULL_BUFFER;
break;
}
}
if (sample.payload && !lookupSuccess &&
!Framework::instance().typeRegistry()->findTypeID(description_.type())->isBasic()) {
if (ipcActive_) {
XR_LOGW(
"StreamIPCHybrid - Failed to lookup shared memory pointer for payload of stream '{}'",
description_.id());
}
return;
}
ipcSample.timestamp = sample.metadata->header.timestamp;
ipcSample.sequenceNumber = sample.metadata->header.sequenceNumber;
ipcSample.numberOfSubSamples = sample.numberOfSubSamples;
for (const auto& processingStamp : sample.metadata->processingStamps) {
ProcessingStampKey key(shm_->get_segment_manager());
key = processingStamp.first.c_str();
ipcSample.processingStamps[key] = processingStamp.second;
}
if (sample.parameters) {
auto sharedParametersPtr = memoryPool_->convert(sample.parameters);
if (sharedParametersPtr) {
ipcSample.parameters = sharedParametersPtr;
} else {
XR_LOGW_ONCE(
"StreamIPCHybrid - Failed to lookup shared memory pointer when sending parameters of stream '{}'",
description_.id());
ipcSample.parameters = memoryPool_->getBufferFromSharedPoolDirect(sampleParameterSize_);
memcpy(ipcSample.parameters.get().get(), sample.parameters.get(), sampleParameterSize_);
}
}
if (sample.dynamicParameters) {
ipcSample.dynamicSampleParameters =
DynamicFields(sampleDynamicFieldCount_, shm_->get_segment_manager());
for (size_t idx = 0; idx < ipcSample.dynamicSampleParameters.size(); ++idx) {
auto& rawDynamicIPC = ipcSample.dynamicSampleParameters[idx];
const auto& rawDynamic = *(sample.dynamicParameters.get() + idx);
rawDynamicIPC.elementCount = rawDynamic.elementCount;
rawDynamicIPC.elementSize = rawDynamic.elementSize;
auto sharedDynamicParameterPtr = memoryPool_->convert(rawDynamic.raw);
if (sharedDynamicParameterPtr) {
rawDynamicIPC.raw = sharedDynamicParameterPtr;
} else {
XR_LOGW_ONCE(
"StreamIPCHybrid - Failed to lookup shared memory pointer when sending dynamic parameter {} of stream '{}'",
idx,
description_.id());
rawDynamicIPC.raw = memoryPool_->getBufferFromSharedPoolDirect(
rawDynamicIPC.elementCount * rawDynamicIPC.elementSize);
std::memcpy(
rawDynamicIPC.raw.get().get(),
rawDynamic.raw.get(),
rawDynamicIPC.elementCount * rawDynamicIPC.elementSize);
}
}
}
ipcProducer_->publish(ipcSample);
}