in Cthulhu/src/SubAligner.cpp [342:435]
void SubAligner::sampleCallback(size_t idx, const StreamSample& sample) {
int activeContext;
{
std::lock_guard<std::mutex> lock(globalMutex_);
activeContext = streams_[idx].activeContext;
// Force everything onto the latest context, for the case where not all streams are
// reconfigured simultaneously
// Note: It may be better to estimate start/end-times for the contexts to determine
if (activeContext < latestContext_) {
auto& config = contexts_[activeContext].streams[idx].config;
streams_[idx].activeContext = latestContext_;
activeContext = latestContext_;
auto& context = contexts_.at(activeContext);
enroll(idx, config, context, lock);
}
}
auto& context = contexts_.at(activeContext);
if (std::holds_alternative<PrimarySelection>(finalizeStrategy_) &&
!std::get<PrimarySelection>(finalizeStrategy_)
.isWithinTolerance(sample.metadata->header.timestamp)) {
XR_LOGW_EVERY_N_SEC(
5,
"Too old sample arrived on stream: #{}, '{}', stamp: {}, tolerance: {}",
idx,
streams_[idx].streamID.c_str(),
sample.metadata->header.timestamp,
std::get<PrimarySelection>(finalizeStrategy_).maxLatencySeconds);
// dump sample
return;
}
// feed underlying aligner
{
sample.metadata->processingStamps["subaligner_start"] = NOW();
const auto alignerLock = std::scoped_lock(context.implMutex, context.streams[idx].streamMutex);
// Store the sample data
const uint32_t seq = context.streams[idx].sequenceIn++;
context.streams[idx].sampleMap[seq] = sample;
if (!hasSampleCallback()) {
// Drop payload: we don't need to carry this buffer around for alignment, since
// we won't look at it later.
context.streams[idx].sampleMap[seq].payload = nullptr;
}
// Feed data in aligner, but act only on the "metadata" as the references are already held by
// sampleMap
const size_t buffer_size =
sample.numberOfSubSamples * context.streams[idx].config.sampleSizeInBytes;
// If the sampling rate for a stream is unknown, propose an end time that's 1ms
// in the future. I'm not sure why this is necessary, since it seems this should
// be something handled by the aligner itself. But, it's a carry over from the
// ArgentCapture library, where this exact computation was done for data without
// a sampling rate (vision pose, rendered pose, IMU, etc.)
//
// Without this end time computation, we cannot align streams that don't have a
// known sampling rate. This does nothing when the sample rate for a stream is
// known, since the default behavior of feed() is to inline -1.0 for the end time.
const double endTime = (0.0 == context.streams[idx].config.nominalSampleRate)
? (sample.metadata->header.timestamp + 0.001)
: -1.0;
context.streams[idx].interface->feed(
hasSampleCallback() ? (CpuBuffer)sample.payload : nullptr,
buffer_size,
sample.metadata->header.timestamp,
endTime);
}
{
std::lock_guard<std::mutex> lock(context.implMutex);
// If there's no primary stream, finalize all streams that are determined to be
// lagging given the current sample timestamp. If there IS a primary stream,
// finalize all streams that are determined to be lagging given the primary stream's
// latest sample.
if (std::holds_alternative<GlobalMaxLatency>(finalizeStrategy_)) {
const double maxLatency = std::get<GlobalMaxLatency>(finalizeStrategy_);
context.impl->finalizeBefore(sample.metadata->header.timestamp - maxLatency);
} else {
auto& primarySelection = std::get<PrimarySelection>(finalizeStrategy_);
context.impl->finalizeBefore(
sample.metadata->header.timestamp - primarySelection.maxLatencySeconds);
primarySelection.setReference(sample.metadata->header.timestamp);
}
}
if (policy_ == ThreadPolicy::THREAD_NEUTRAL) {
align();
}
}