void QueueingAligner::align()

in Cthulhu/src/QueueingAligner.cpp [43:136]


void QueueingAligner::align() {
  if (!finalized_) {
    return;
  }

  std::chrono::time_point<std::chrono::steady_clock> start = std::chrono::steady_clock::now();

  std::vector<StreamSample> samples;
  samples.reserve(queues_.size());
  AlignerSamplesMeta samplesMeta;
  samplesMeta.reserve(queues_.size());
  {
    // We're safe to access queues_.size() outside of the lock,
    // since we guarantee that the size will not change after
    // finalization.
    std::lock_guard<std::mutex> lock(queueMutex_);

    if (!configured_) {
      // Try to configure
      bool updateConfig = true;
      for (auto& queue : queues_) {
        updateConfig = updateConfig && queue.hasConfig;
      }
      if (updateConfig) {
        std::vector<StreamConfig> configs;
        configs.reserve(queues_.size());
        AlignerConfigsMeta meta;
        meta.reserve(queues_.size());
        for (const auto& queue : queues_) {
          configs.push_back(queue.config);
          meta.push_back(AlignerStreamMeta{queue.id, queue.config.sampleSizeInBytes});
        }
        inhibitSampleCallback_ = !alignedConfigCallback(configs);
        configured_ = true;
        alignedConfigsMetaCallback(meta);
      }
    }

    if (configured_) {
      // Aggregate samples and meta
      for (auto& queue : queues_) {
        StreamSample sample;
        AlignerSampleMeta meta;
        if (!queue.samples.empty()) {
          sample.parameters = queue.samples.front().parameters;
          sample.metadata = queue.samples.front().metadata;
          meta.timestamp = queue.samples.front().metadata->header.timestamp;
          // TBD: Should we propagate the history timestamps all of inputs?
        }
        // Note: we still propagate empty samples
        int payloadSize = std::accumulate(
            queue.samples.begin(),
            queue.samples.end(),
            0,
            [](int val, const StreamSample& next) -> int { return val + next.numberOfSubSamples; });
        sample.payload = Framework::instance().memoryPool()->getBufferFromPool(
            queue.id, payloadSize * queue.config.sampleSizeInBytes);
        meta.references.reserve(queue.samples.size());
        int index = 0;
        for (const auto& inputSample : queue.samples) {
          AlignerReferenceMeta reference;
          std::memcpy(
              ((CpuBuffer)sample.payload).get() + index * queue.config.sampleSizeInBytes,
              ((CpuBuffer)inputSample.payload).get(),
              inputSample.numberOfSubSamples * queue.config.sampleSizeInBytes);
          index += inputSample.numberOfSubSamples;
          reference.sequenceNumber = inputSample.metadata->header.sequenceNumber;
          reference.subSampleOffset = 0;
          reference.numSubSamples = inputSample.numberOfSubSamples;
          meta.references.push_back(reference);
        }
        sample.numberOfSubSamples = payloadSize;
        samples.push_back(sample);
        samplesMeta.push_back(meta);
        queue.samples.clear();
      }
    }
  }

  if (samples.size() == queues_.size()) {
    if (!inhibitSampleCallback_) {
      alignedSamplesMetaCallback(samplesMeta);
      alignedCallback(samples);
    }
  }

  std::chrono::time_point<std::chrono::steady_clock> end = std::chrono::steady_clock::now();
  auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
  // The AlignerBase will sleep for 1ms, so offset this in our calculation
  int delayInMs = (1000.0 / outputRate_) - ms - 1.0;
  if (delayInMs > 0.0) {
    std::this_thread::sleep_for(std::chrono::milliseconds(delayInMs));
  }
}