in Cthulhu/src/SubAligner.cpp [144:294]
void SubAligner::processManifests(
const std::vector<subaligner::Manifest>& manifests,
const std::lock_guard<std::mutex>& globalMutexLock,
AlignerContext& context) {
// Flatten data and execute callbacks
for (auto& manifest : manifests) {
if (manifest.completed_streams.size() != streams_.size()) {
for (auto& stream : manifest.references) {
auto sindex = context.lookupIndex.at(stream.first);
auto sampleSize =
std::max(context.streams.at(sindex).config.sampleSizeInBytes, uint32_t(1));
auto& sampleMap = context.streams.at(sindex).sampleMap;
for (auto& r : stream.second) {
if (r.nrbytes_offset + r.nrbytes_length ==
(r.buffer_tagged.nrsamples_total * sampleSize)) {
sampleMap.erase(r.buffer_tagged.sequence_number);
}
}
}
// Log which stream(s) was (were) missing, as it helps with troublshooting
fmt::memory_buffer buffer;
bool comma = false;
for (int stream_idx = 0; stream_idx < streams_.size(); ++stream_idx) {
if (0 == manifest.completed_streams.count(stream_idx)) {
fmt::format_to(buffer, "{}{}", comma ? "," : "", streams_[stream_idx].streamID);
comma = true;
}
}
XR_LOGW_EVERY_N_SEC(
5.0,
"Subaligner::processManifests - Finalized an incomplete manifest, missing: {}",
buffer.data());
continue;
}
// check if we need to bother with creating output samples when nobody consumes them (e.g. only
// records alignment metadata)
const bool samplesNeeded = hasSampleCallback() && !inhibitSampleCallback_;
std::vector<StreamSample> samples(streams_.size());
AlignerSamplesMeta samplesMeta(streams_.size());
for (auto& stream : manifest.references) {
auto sindex = context.lookupIndex.at(stream.first);
std::lock_guard<std::mutex> lock(context.streams.at(sindex).streamMutex);
auto* const sample = samplesNeeded ? &samples[sindex] : nullptr;
auto& sampleMeta = samplesMeta[sindex];
if (stream.second.size() > 0) {
size_t length = 0;
auto& sampleMap = context.streams.at(sindex).sampleMap;
const uint32_t sampleSize = context.streams.at(sindex).config.sampleSizeInBytes;
sampleMeta.references.resize(stream.second.size());
size_t ridx = 0;
for (auto& r : stream.second) {
if (sampleMap.find(r.buffer_tagged.sequence_number) == sampleMap.end()) {
XR_LOGD(
"Subaligner::processManifests - Attempted to close a reference for which we've "
"don't have a sample.");
return;
}
sampleMeta.references[ridx].timestamp =
sampleMap[r.buffer_tagged.sequence_number].metadata->header.timestamp;
sampleMeta.references[ridx].sequenceNumber =
sampleMap[r.buffer_tagged.sequence_number].metadata->header.sequenceNumber;
sampleMeta.references[ridx].subSampleOffset = r.nrbytes_offset / sampleSize;
sampleMeta.references[ridx].numSubSamples = r.nrbytes_length / sampleSize;
length += r.nrbytes_length;
std::string sequenceString = std::to_string(r.buffer_tagged.sequence_number);
// Copy full history
if (sample) {
sample->metadata->history["subaligner_" + sequenceString] =
sampleMap[r.buffer_tagged.sequence_number].metadata;
sample->metadata->processingStamps["subaligner_" + sequenceString + "_start"] =
sampleMap[r.buffer_tagged.sequence_number]
.metadata->processingStamps["subaligner_start"];
}
++ridx;
}
uint8_t* ptr = nullptr;
if (sample) {
// allocate buffer for the output "flattened" sample
sample->parameters = sampleMap[stream.second[0].buffer_tagged.sequence_number].parameters;
sample->numberOfSubSamples = length / sampleSize;
sample->payload = Framework::instance().memoryPool()->getBufferFromPool(
streams_.at(sindex).streamID, length);
ptr = ((CpuBuffer)sample->payload).get();
}
for (auto& r : stream.second) {
if (ptr) {
std::copy(
r.buffer_tagged.buffer_durational.buffer.get() + r.nrbytes_offset,
r.buffer_tagged.buffer_durational.buffer.get() + r.nrbytes_offset +
r.nrbytes_length,
ptr);
ptr += r.nrbytes_length;
}
if (r.nrbytes_offset + r.nrbytes_length ==
(r.buffer_tagged.nrsamples_total * sampleSize) &&
sampleMap.find(r.buffer_tagged.sequence_number) != sampleMap.end()) {
sampleMap.erase(r.buffer_tagged.sequence_number);
}
}
// finalize sample header, first sample might be partial, compensate for the potential
// offset
const double samplePeriod = 1.0 / stream.second[0].buffer_tagged.sample_rate;
const double sample_timestamp =
stream.second[0].buffer_tagged.buffer_durational.duration.start_time +
double(stream.second[0].nrbytes_offset / sampleSize) * samplePeriod;
double sample_duration = double(length) / sampleSize * samplePeriod;
if (sample) {
sample->metadata->header.sequenceNumber = streams_.at(sindex).sequenceOut++;
sample->metadata->header.timestamp = sample_timestamp;
}
// effective computed duration of the manifest for this sample
sampleMeta.timestamp = sample_timestamp;
sampleMeta.duration = sample_duration;
}
}
// Check configs and execute
if (!context.configured) {
std::vector<StreamConfig> configs;
configs.reserve(context.streams.size());
AlignerConfigsMeta configsMeta;
configsMeta.reserve(context.streams.size());
size_t idx = 0;
for (const auto& stream : context.streams) {
configs.push_back(stream.second.config);
configsMeta.push_back(
AlignerStreamMeta{streams_[idx].streamID, stream.second.config.sampleSizeInBytes});
idx++;
}
if (configs.size() == streams_.size()) {
inhibitSampleCallback_ = !alignedConfigCallback(configs);
context.configured = true;
alignedConfigsMetaCallback(configsMeta);
}
}
if (!inhibitSampleCallback_) {
alignedSamplesMetaCallback(samplesMeta);
if (samplesNeeded) {
const auto seconds = NOW();
for (const auto& sample : samples) {
sample.metadata->processingStamps["subaligner_end"] = seconds;
}
alignedCallback(samples);
}
}
}
}