in Cthulhu/src/SubAlignerImpl.cpp [354:436]
int Aligner::align(int identifier_hint) {
bool should_switch = false; // Should switch to another stream?
// A non-negative identifier hint means that a batch of samples from the corresponding stream
// has been received, and we should examine the possiblity for alignment of all manifests for
// that stream.
std::vector<int> relevant_streams;
if (identifier_hint >= 0)
relevant_streams.push_back(identifier_hint);
else {
// If the hint is negative, examine all the streams.
relevant_streams.resize(registry_.size());
std::iota(relevant_streams.begin(), relevant_streams.end(), 0);
}
for (const auto& identifier : relevant_streams) {
should_switch = false;
auto& stream = *registry_[identifier];
// Attempt for alignment if there are active manifests and batches. Continue until
// either all manifests are complete of this stream or there are no batches anymore.
while (!should_switch && stream.manifest_upstream_index < active_manifests_.size()) {
auto& manifest = active_manifests_[stream.manifest_upstream_index];
should_switch = stream.batches.empty();
// Cannot proceed if we don't have batches for alignment business.
while (!should_switch && !stream.batches.empty()) {
auto& batch = stream.batches.front();
if (batch.buffer_durational.duration.end_time <
batch.buffer_durational.duration.start_time) {
should_switch = true;
break; // Cannot proceed if the batch doesn't have an end time.
}
// The manifest proposes the new offset in units of samples within the active batch to be
// this much... 0.5 is added to enforce the fifth assumption - see above and consider that
// double -> size_t is truncation.
auto proposed_sampleoffset = static_cast<int64_t>(
batch.sample_rate *
(manifest.duration.end_time - batch.buffer_durational.duration.start_time) +
0.5);
if (proposed_sampleoffset <= static_cast<int64_t>(batch.nrsamples_current)) {
// This means that the request does not require samples beyond the current pointer
// within the batch. Consider the manifest to be complete of this stream in this case.
manifest.completed_streams.insert(identifier);
stream.manifest_upstream_index++; // Proceed to the next manifest.
if (manifest.completed_streams.size() == registry_.size())
nr_manifests_completed += finalizeOne();
// Break to obtain reference to the new manifest.
break;
} else {
// nr_samples can only be zero if and and only if batch.nrsamples_total =
// batch.nrsamples_current, in which case this batch should be dropped. If it is
// non-zero, insert a reference to the container.
size_t nr_samples =
std::min(static_cast<size_t>(proposed_sampleoffset), batch.nrsamples_total) -
batch.nrsamples_current;
if (!nr_samples)
stream.batches.pop_front();
else {
Reference ref;
ref.nrbytes_offset = batch.nrsamples_current * stream.sample_bytewidth;
ref.nrbytes_length = nr_samples * stream.sample_bytewidth;
batch.nrsamples_current += nr_samples;
ref.buffer_tagged = batch;
stream.nrsamples_processed += nr_samples;
stream.nrbytes_pending -= ref.nrbytes_length;
stream.deficit -= nr_samples / batch.sample_rate;
manifest.references[identifier].push_back(std::move(ref));
} // if (nr_samples)
} // if (proposed > current)
} // stream.batches.empty
} // head < size.
} // identifiers.
return static_cast<int>(nr_manifests_completed);
}