in Cthulhu/src/QueueingAligner.cpp [43:136]
void QueueingAligner::align() {
if (!finalized_) {
return;
}
std::chrono::time_point<std::chrono::steady_clock> start = std::chrono::steady_clock::now();
std::vector<StreamSample> samples;
samples.reserve(queues_.size());
AlignerSamplesMeta samplesMeta;
samplesMeta.reserve(queues_.size());
{
// We're safe to access queues_.size() outside of the lock,
// since we guarantee that the size will not change after
// finalization.
std::lock_guard<std::mutex> lock(queueMutex_);
if (!configured_) {
// Try to configure
bool updateConfig = true;
for (auto& queue : queues_) {
updateConfig = updateConfig && queue.hasConfig;
}
if (updateConfig) {
std::vector<StreamConfig> configs;
configs.reserve(queues_.size());
AlignerConfigsMeta meta;
meta.reserve(queues_.size());
for (const auto& queue : queues_) {
configs.push_back(queue.config);
meta.push_back(AlignerStreamMeta{queue.id, queue.config.sampleSizeInBytes});
}
inhibitSampleCallback_ = !alignedConfigCallback(configs);
configured_ = true;
alignedConfigsMetaCallback(meta);
}
}
if (configured_) {
// Aggregate samples and meta
for (auto& queue : queues_) {
StreamSample sample;
AlignerSampleMeta meta;
if (!queue.samples.empty()) {
sample.parameters = queue.samples.front().parameters;
sample.metadata = queue.samples.front().metadata;
meta.timestamp = queue.samples.front().metadata->header.timestamp;
// TBD: Should we propagate the history timestamps all of inputs?
}
// Note: we still propagate empty samples
int payloadSize = std::accumulate(
queue.samples.begin(),
queue.samples.end(),
0,
[](int val, const StreamSample& next) -> int { return val + next.numberOfSubSamples; });
sample.payload = Framework::instance().memoryPool()->getBufferFromPool(
queue.id, payloadSize * queue.config.sampleSizeInBytes);
meta.references.reserve(queue.samples.size());
int index = 0;
for (const auto& inputSample : queue.samples) {
AlignerReferenceMeta reference;
std::memcpy(
((CpuBuffer)sample.payload).get() + index * queue.config.sampleSizeInBytes,
((CpuBuffer)inputSample.payload).get(),
inputSample.numberOfSubSamples * queue.config.sampleSizeInBytes);
index += inputSample.numberOfSubSamples;
reference.sequenceNumber = inputSample.metadata->header.sequenceNumber;
reference.subSampleOffset = 0;
reference.numSubSamples = inputSample.numberOfSubSamples;
meta.references.push_back(reference);
}
sample.numberOfSubSamples = payloadSize;
samples.push_back(sample);
samplesMeta.push_back(meta);
queue.samples.clear();
}
}
}
if (samples.size() == queues_.size()) {
if (!inhibitSampleCallback_) {
alignedSamplesMetaCallback(samplesMeta);
alignedCallback(samples);
}
}
std::chrono::time_point<std::chrono::steady_clock> end = std::chrono::steady_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// The AlignerBase will sleep for 1ms, so offset this in our calculation
int delayInMs = (1000.0 / outputRate_) - ms - 1.0;
if (delayInMs > 0.0) {
std::this_thread::sleep_for(std::chrono::milliseconds(delayInMs));
}
}