void SubAligner::processManifests()

in Cthulhu/src/SubAligner.cpp [144:294]


void SubAligner::processManifests(
    const std::vector<subaligner::Manifest>& manifests,
    const std::lock_guard<std::mutex>& globalMutexLock,
    AlignerContext& context) {
  // Flatten data and execute callbacks
  for (auto& manifest : manifests) {
    if (manifest.completed_streams.size() != streams_.size()) {
      for (auto& stream : manifest.references) {
        auto sindex = context.lookupIndex.at(stream.first);
        auto sampleSize =
            std::max(context.streams.at(sindex).config.sampleSizeInBytes, uint32_t(1));
        auto& sampleMap = context.streams.at(sindex).sampleMap;
        for (auto& r : stream.second) {
          if (r.nrbytes_offset + r.nrbytes_length ==
              (r.buffer_tagged.nrsamples_total * sampleSize)) {
            sampleMap.erase(r.buffer_tagged.sequence_number);
          }
        }
      }

      // Log which stream(s) was (were) missing, as it helps with troublshooting
      fmt::memory_buffer buffer;
      bool comma = false;
      for (int stream_idx = 0; stream_idx < streams_.size(); ++stream_idx) {
        if (0 == manifest.completed_streams.count(stream_idx)) {
          fmt::format_to(buffer, "{}{}", comma ? "," : "", streams_[stream_idx].streamID);
          comma = true;
        }
      }

      XR_LOGW_EVERY_N_SEC(
          5.0,
          "Subaligner::processManifests - Finalized an incomplete manifest, missing: {}",
          buffer.data());
      continue;
    }
    // check if we need to bother with creating output samples when nobody consumes them (e.g. only
    // records alignment metadata)
    const bool samplesNeeded = hasSampleCallback() && !inhibitSampleCallback_;

    std::vector<StreamSample> samples(streams_.size());
    AlignerSamplesMeta samplesMeta(streams_.size());
    for (auto& stream : manifest.references) {
      auto sindex = context.lookupIndex.at(stream.first);
      std::lock_guard<std::mutex> lock(context.streams.at(sindex).streamMutex);
      auto* const sample = samplesNeeded ? &samples[sindex] : nullptr;
      auto& sampleMeta = samplesMeta[sindex];
      if (stream.second.size() > 0) {
        size_t length = 0;
        auto& sampleMap = context.streams.at(sindex).sampleMap;
        const uint32_t sampleSize = context.streams.at(sindex).config.sampleSizeInBytes;
        sampleMeta.references.resize(stream.second.size());
        size_t ridx = 0;
        for (auto& r : stream.second) {
          if (sampleMap.find(r.buffer_tagged.sequence_number) == sampleMap.end()) {
            XR_LOGD(
                "Subaligner::processManifests - Attempted to close a reference for which we've "
                "don't have a sample.");
            return;
          }
          sampleMeta.references[ridx].timestamp =
              sampleMap[r.buffer_tagged.sequence_number].metadata->header.timestamp;
          sampleMeta.references[ridx].sequenceNumber =
              sampleMap[r.buffer_tagged.sequence_number].metadata->header.sequenceNumber;
          sampleMeta.references[ridx].subSampleOffset = r.nrbytes_offset / sampleSize;
          sampleMeta.references[ridx].numSubSamples = r.nrbytes_length / sampleSize;
          length += r.nrbytes_length;
          std::string sequenceString = std::to_string(r.buffer_tagged.sequence_number);
          // Copy full history
          if (sample) {
            sample->metadata->history["subaligner_" + sequenceString] =
                sampleMap[r.buffer_tagged.sequence_number].metadata;
            sample->metadata->processingStamps["subaligner_" + sequenceString + "_start"] =
                sampleMap[r.buffer_tagged.sequence_number]
                    .metadata->processingStamps["subaligner_start"];
          }
          ++ridx;
        }
        uint8_t* ptr = nullptr;
        if (sample) {
          // allocate buffer for the output "flattened" sample
          sample->parameters = sampleMap[stream.second[0].buffer_tagged.sequence_number].parameters;
          sample->numberOfSubSamples = length / sampleSize;
          sample->payload = Framework::instance().memoryPool()->getBufferFromPool(
              streams_.at(sindex).streamID, length);
          ptr = ((CpuBuffer)sample->payload).get();
        }
        for (auto& r : stream.second) {
          if (ptr) {
            std::copy(
                r.buffer_tagged.buffer_durational.buffer.get() + r.nrbytes_offset,
                r.buffer_tagged.buffer_durational.buffer.get() + r.nrbytes_offset +
                    r.nrbytes_length,
                ptr);
            ptr += r.nrbytes_length;
          }

          if (r.nrbytes_offset + r.nrbytes_length ==
                  (r.buffer_tagged.nrsamples_total * sampleSize) &&
              sampleMap.find(r.buffer_tagged.sequence_number) != sampleMap.end()) {
            sampleMap.erase(r.buffer_tagged.sequence_number);
          }
        }
        // finalize sample header, first sample might be partial, compensate for the potential
        // offset
        const double samplePeriod = 1.0 / stream.second[0].buffer_tagged.sample_rate;
        const double sample_timestamp =
            stream.second[0].buffer_tagged.buffer_durational.duration.start_time +
            double(stream.second[0].nrbytes_offset / sampleSize) * samplePeriod;
        double sample_duration = double(length) / sampleSize * samplePeriod;
        if (sample) {
          sample->metadata->header.sequenceNumber = streams_.at(sindex).sequenceOut++;
          sample->metadata->header.timestamp = sample_timestamp;
        }
        // effective computed duration of the manifest for this sample
        sampleMeta.timestamp = sample_timestamp;
        sampleMeta.duration = sample_duration;
      }
    }
    // Check configs and execute
    if (!context.configured) {
      std::vector<StreamConfig> configs;
      configs.reserve(context.streams.size());
      AlignerConfigsMeta configsMeta;
      configsMeta.reserve(context.streams.size());
      size_t idx = 0;
      for (const auto& stream : context.streams) {
        configs.push_back(stream.second.config);
        configsMeta.push_back(
            AlignerStreamMeta{streams_[idx].streamID, stream.second.config.sampleSizeInBytes});
        idx++;
      }
      if (configs.size() == streams_.size()) {
        inhibitSampleCallback_ = !alignedConfigCallback(configs);
        context.configured = true;
        alignedConfigsMetaCallback(configsMeta);
      }
    }

    if (!inhibitSampleCallback_) {
      alignedSamplesMetaCallback(samplesMeta);
      if (samplesNeeded) {
        const auto seconds = NOW();
        for (const auto& sample : samples) {
          sample.metadata->processingStamps["subaligner_end"] = seconds;
        }
        alignedCallback(samples);
      }
    }
  }
}