in watchman/cmds/subscribe.cpp [212:302]
json_ref ClientSubscription::buildSubscriptionResults(
const std::shared_ptr<Root>& root,
ClockSpec& position,
OnStateTransition onStateTransition) {
auto since_spec = query->since_spec.get();
if (const auto* clock = since_spec
? std::get_if<ClockSpec::Clock>(&since_spec->spec)
: nullptr) {
log(DBG,
"running subscription ",
name,
" rules since ",
clock->position.ticks,
"\n");
} else {
log(DBG, "running subscription ", name, " rules (no since)\n");
}
// Subscriptions never need to sync explicitly; we are only dispatched
// at settle points which are by definition sync'd to the present time
query->sync_timeout = std::chrono::milliseconds(0);
// We're called by the io thread, so there's little chance that the root
// could be legitimately blocked by something else. That means that we
// can use a short lock_timeout
query->lock_timeout =
uint32_t(root->config.getInt("subscription_lock_timeout_ms", 100));
logf(DBG, "running subscription {} {}\n", name, fmt::ptr(this));
try {
auto res = w_query_execute(query.get(), root, time_generator, getInterface);
logf(
DBG,
"subscription {} generated {} results\n",
name,
res.resultsArray.array().size());
position = res.clockAtStartOfQuery;
// An SCM operation was interleaved with the query execution. This could
// result in over-reporing query results. Discard our results but, do not
// update the clock in order to allow changes to be reported the next time
// the query is run.
bool scmAwareQuery = since_spec && since_spec->hasScmParams();
if (onStateTransition == OnStateTransition::DontAdvance && scmAwareQuery) {
if (root->stateTransCount.load() != res.stateTransCountAtStartOfQuery) {
log(DBG,
"discarding SCM aware query results, SCM activity interleaved\n");
return nullptr;
}
}
// We can suppress empty results, unless this is a source code aware query
// and the mergeBase has changed or this is a fresh instance.
bool mergeBaseChanged = scmAwareQuery &&
res.clockAtStartOfQuery.scmMergeBase != query->since_spec->scmMergeBase;
if (res.resultsArray.array().empty() && !mergeBaseChanged &&
!res.isFreshInstance) {
updateSubscriptionTicks(&res);
return nullptr;
}
auto response = make_response();
// It is way too much of a hassle to try to recreate the clock value if it's
// not a relative clock spec, and it's only going to happen on the first run
// anyway, so just skip doing that entirely.
if (since_spec &&
std::holds_alternative<ClockSpec::Clock>(since_spec->spec)) {
response.set("since", since_spec->toJson());
}
updateSubscriptionTicks(&res);
response.set(
{{"is_fresh_instance", json_boolean(res.isFreshInstance)},
{"clock", res.clockAtStartOfQuery.toJson()},
{"files", std::move(res.resultsArray)},
{"root", w_string_to_json(root->root_path)},
{"subscription", w_string_to_json(name)},
{"unilateral", json_true()}});
if (res.savedStateInfo) {
response.set({{"saved-state-info", std::move(res.savedStateInfo)}});
}
return response;
} catch (const QueryExecError& e) {
log(ERR, "error running subscription ", name, " query: ", e.what());
return nullptr;
}
}