json_ref ClientSubscription::buildSubscriptionResults()

in watchman/cmds/subscribe.cpp [212:302]


json_ref ClientSubscription::buildSubscriptionResults(
    const std::shared_ptr<Root>& root,
    ClockSpec& position,
    OnStateTransition onStateTransition) {
  auto since_spec = query->since_spec.get();

  if (const auto* clock = since_spec
          ? std::get_if<ClockSpec::Clock>(&since_spec->spec)
          : nullptr) {
    log(DBG,
        "running subscription ",
        name,
        " rules since ",
        clock->position.ticks,
        "\n");
  } else {
    log(DBG, "running subscription ", name, " rules (no since)\n");
  }

  // Subscriptions never need to sync explicitly; we are only dispatched
  // at settle points which are by definition sync'd to the present time
  query->sync_timeout = std::chrono::milliseconds(0);
  // We're called by the io thread, so there's little chance that the root
  // could be legitimately blocked by something else.  That means that we
  // can use a short lock_timeout
  query->lock_timeout =
      uint32_t(root->config.getInt("subscription_lock_timeout_ms", 100));
  logf(DBG, "running subscription {} {}\n", name, fmt::ptr(this));

  try {
    auto res = w_query_execute(query.get(), root, time_generator, getInterface);

    logf(
        DBG,
        "subscription {} generated {} results\n",
        name,
        res.resultsArray.array().size());

    position = res.clockAtStartOfQuery;

    // An SCM operation was interleaved with the query execution. This could
    // result in over-reporing query results. Discard our results but, do not
    // update the clock in order to allow changes to be reported the next time
    // the query is run.
    bool scmAwareQuery = since_spec && since_spec->hasScmParams();
    if (onStateTransition == OnStateTransition::DontAdvance && scmAwareQuery) {
      if (root->stateTransCount.load() != res.stateTransCountAtStartOfQuery) {
        log(DBG,
            "discarding SCM aware query results, SCM activity interleaved\n");
        return nullptr;
      }
    }

    // We can suppress empty results, unless this is a source code aware query
    // and the mergeBase has changed or this is a fresh instance.
    bool mergeBaseChanged = scmAwareQuery &&
        res.clockAtStartOfQuery.scmMergeBase != query->since_spec->scmMergeBase;
    if (res.resultsArray.array().empty() && !mergeBaseChanged &&
        !res.isFreshInstance) {
      updateSubscriptionTicks(&res);
      return nullptr;
    }

    auto response = make_response();

    // It is way too much of a hassle to try to recreate the clock value if it's
    // not a relative clock spec, and it's only going to happen on the first run
    // anyway, so just skip doing that entirely.
    if (since_spec &&
        std::holds_alternative<ClockSpec::Clock>(since_spec->spec)) {
      response.set("since", since_spec->toJson());
    }
    updateSubscriptionTicks(&res);

    response.set(
        {{"is_fresh_instance", json_boolean(res.isFreshInstance)},
         {"clock", res.clockAtStartOfQuery.toJson()},
         {"files", std::move(res.resultsArray)},
         {"root", w_string_to_json(root->root_path)},
         {"subscription", w_string_to_json(name)},
         {"unilateral", json_true()}});
    if (res.savedStateInfo) {
      response.set({{"saved-state-info", std::move(res.savedStateInfo)}});
    }

    return response;
  } catch (const QueryExecError& e) {
    log(ERR, "error running subscription ", name, " query: ", e.what());
    return nullptr;
  }
}