void SubAligner::sampleCallback()

in Cthulhu/src/SubAligner.cpp [342:435]


void SubAligner::sampleCallback(size_t idx, const StreamSample& sample) {
  int activeContext;
  {
    std::lock_guard<std::mutex> lock(globalMutex_);

    activeContext = streams_[idx].activeContext;

    // Force everything onto the latest context, for the case where not all streams are
    // reconfigured simultaneously
    // Note: It may be better to estimate start/end-times for the contexts to determine
    if (activeContext < latestContext_) {
      auto& config = contexts_[activeContext].streams[idx].config;
      streams_[idx].activeContext = latestContext_;
      activeContext = latestContext_;
      auto& context = contexts_.at(activeContext);
      enroll(idx, config, context, lock);
    }
  }

  auto& context = contexts_.at(activeContext);

  if (std::holds_alternative<PrimarySelection>(finalizeStrategy_) &&
      !std::get<PrimarySelection>(finalizeStrategy_)
           .isWithinTolerance(sample.metadata->header.timestamp)) {
    XR_LOGW_EVERY_N_SEC(
        5,
        "Too old sample arrived on stream: #{}, '{}', stamp: {}, tolerance: {}",
        idx,
        streams_[idx].streamID.c_str(),
        sample.metadata->header.timestamp,
        std::get<PrimarySelection>(finalizeStrategy_).maxLatencySeconds);
    // dump sample
    return;
  }

  // feed underlying aligner
  {
    sample.metadata->processingStamps["subaligner_start"] = NOW();
    const auto alignerLock = std::scoped_lock(context.implMutex, context.streams[idx].streamMutex);
    // Store the sample data
    const uint32_t seq = context.streams[idx].sequenceIn++;
    context.streams[idx].sampleMap[seq] = sample;
    if (!hasSampleCallback()) {
      // Drop payload: we don't need to carry this buffer around for alignment, since
      // we won't look at it later.
      context.streams[idx].sampleMap[seq].payload = nullptr;
    }

    // Feed data in aligner, but act only on the "metadata" as the references are already held by
    // sampleMap
    const size_t buffer_size =
        sample.numberOfSubSamples * context.streams[idx].config.sampleSizeInBytes;

    // If the sampling rate for a stream is unknown, propose an end time that's 1ms
    // in the future. I'm not sure why this is necessary, since it seems this should
    // be something handled by the aligner itself. But, it's a carry over from the
    // ArgentCapture library, where this exact computation was done for data without
    // a sampling rate (vision pose, rendered pose, IMU, etc.)
    //
    // Without this end time computation, we cannot align streams that don't have a
    // known sampling rate. This does nothing when the sample rate for a stream is
    // known, since the default behavior of feed() is to inline -1.0 for the end time.
    const double endTime = (0.0 == context.streams[idx].config.nominalSampleRate)
        ? (sample.metadata->header.timestamp + 0.001)
        : -1.0;

    context.streams[idx].interface->feed(
        hasSampleCallback() ? (CpuBuffer)sample.payload : nullptr,
        buffer_size,
        sample.metadata->header.timestamp,
        endTime);
  }

  {
    std::lock_guard<std::mutex> lock(context.implMutex);

    // If there's no primary stream, finalize all streams that are determined to be
    // lagging given the current sample timestamp. If there IS a primary stream,
    // finalize all streams that are determined to be lagging given the primary stream's
    // latest sample.
    if (std::holds_alternative<GlobalMaxLatency>(finalizeStrategy_)) {
      const double maxLatency = std::get<GlobalMaxLatency>(finalizeStrategy_);
      context.impl->finalizeBefore(sample.metadata->header.timestamp - maxLatency);
    } else {
      auto& primarySelection = std::get<PrimarySelection>(finalizeStrategy_);
      context.impl->finalizeBefore(
          sample.metadata->header.timestamp - primarySelection.maxLatencySeconds);
      primarySelection.setReference(sample.metadata->header.timestamp);
    }
  }
  if (policy_ == ThreadPolicy::THREAD_NEUTRAL) {
    align();
  }
}