int Aligner::align()

in Cthulhu/src/SubAlignerImpl.cpp [354:436]


int Aligner::align(int identifier_hint) {
  bool should_switch = false; // Should switch to another stream?

  // A non-negative identifier hint means that a batch of samples from the corresponding stream
  // has been received, and we should examine the possiblity for alignment of all manifests for
  // that stream.
  std::vector<int> relevant_streams;
  if (identifier_hint >= 0)
    relevant_streams.push_back(identifier_hint);
  else {
    // If the hint is negative, examine all the streams.
    relevant_streams.resize(registry_.size());
    std::iota(relevant_streams.begin(), relevant_streams.end(), 0);
  }

  for (const auto& identifier : relevant_streams) {
    should_switch = false;
    auto& stream = *registry_[identifier];

    // Attempt for alignment if there are active manifests and batches. Continue until
    // either all manifests are complete of this stream or there are no batches anymore.

    while (!should_switch && stream.manifest_upstream_index < active_manifests_.size()) {
      auto& manifest = active_manifests_[stream.manifest_upstream_index];
      should_switch = stream.batches.empty();
      // Cannot proceed if we don't have batches for alignment business.

      while (!should_switch && !stream.batches.empty()) {
        auto& batch = stream.batches.front();
        if (batch.buffer_durational.duration.end_time <
            batch.buffer_durational.duration.start_time) {
          should_switch = true;
          break; // Cannot proceed if the batch doesn't have an end time.
        }

        // The manifest proposes the new offset in units of samples within the active batch to be
        // this much... 0.5 is added to enforce the fifth assumption - see above and consider that
        // double -> size_t is truncation.
        auto proposed_sampleoffset = static_cast<int64_t>(
            batch.sample_rate *
                (manifest.duration.end_time - batch.buffer_durational.duration.start_time) +
            0.5);
        if (proposed_sampleoffset <= static_cast<int64_t>(batch.nrsamples_current)) {
          // This means that the request does not require samples beyond the current pointer
          // within the batch. Consider the manifest to be complete of this stream in this case.

          manifest.completed_streams.insert(identifier);
          stream.manifest_upstream_index++; // Proceed to the next manifest.
          if (manifest.completed_streams.size() == registry_.size())
            nr_manifests_completed += finalizeOne();

          // Break to obtain reference to the new manifest.
          break;
        } else {
          // nr_samples can only be zero if and and only if batch.nrsamples_total =
          // batch.nrsamples_current, in which case this batch should be dropped. If it is
          // non-zero, insert a reference to the container.
          size_t nr_samples =
              std::min(static_cast<size_t>(proposed_sampleoffset), batch.nrsamples_total) -
              batch.nrsamples_current;

          if (!nr_samples)
            stream.batches.pop_front();
          else {
            Reference ref;
            ref.nrbytes_offset = batch.nrsamples_current * stream.sample_bytewidth;
            ref.nrbytes_length = nr_samples * stream.sample_bytewidth;
            batch.nrsamples_current += nr_samples;
            ref.buffer_tagged = batch;

            stream.nrsamples_processed += nr_samples;
            stream.nrbytes_pending -= ref.nrbytes_length;
            stream.deficit -= nr_samples / batch.sample_rate;
            manifest.references[identifier].push_back(std::move(ref));

          } // if (nr_samples)
        } // if (proposed > current)
      } // stream.batches.empty
    } // head < size.
  } // identifiers.

  return static_cast<int>(nr_manifests_completed);
}