in watchman/cmds/subscribe.cpp [319:441]
static void cmd_flush_subscriptions(Client* clientbase, const json_ref& args) {
auto client = (UserClient*)clientbase;
int sync_timeout;
json_ref subs(nullptr);
// TODO: merge this parse and sync logic with the logic in query evaluation
if (json_array_size(args) == 3) {
auto& sync_timeout_obj = args.at(2).get("sync_timeout");
subs = args.at(2).get_default("subscriptions", nullptr);
if (!sync_timeout_obj.isInt()) {
client->sendErrorResponse("'sync_timeout' must be an integer");
return;
}
sync_timeout = sync_timeout_obj.asInt();
} else {
client->sendErrorResponse(
"wrong number of arguments to 'flush-subscriptions'");
return;
}
auto root = resolveRoot(client, args);
std::vector<w_string> subs_to_sync;
if (subs) {
if (!subs.isArray()) {
client->sendErrorResponse(
"expected 'subscriptions' to be an array of subscription names");
return;
}
for (auto& sub_name : subs.array()) {
if (!sub_name.isString()) {
client->sendErrorResponse(
"expected 'subscriptions' to be an array of subscription names");
return;
}
auto& sub_name_str = json_to_w_string(sub_name);
auto sub_iter = client->subscriptions.find(sub_name_str);
if (sub_iter == client->subscriptions.end()) {
client->sendErrorResponse(
"this client does not have a subscription named '{}'",
sub_name_str);
return;
}
auto& sub = sub_iter->second;
if (sub->root != root) {
client->sendErrorResponse(
"subscription '{}' is on root '{}' different from command root "
"'{}'",
sub_name_str,
sub->root->root_path,
root->root_path);
return;
}
subs_to_sync.push_back(sub_name_str);
}
} else {
// Look for all subscriptions matching this root.
for (auto& sub_iter : client->subscriptions) {
if (sub_iter.second->root == root) {
subs_to_sync.push_back(sub_iter.first);
}
}
}
root->syncToNow(std::chrono::milliseconds(sync_timeout));
auto resp = make_response();
auto synced = json_array();
auto no_sync_needed = json_array();
auto dropped = json_array();
for (auto& sub_name_str : subs_to_sync) {
auto sub_iter = client->subscriptions.find(sub_name_str);
auto& sub = sub_iter->second;
sub_action action;
w_string policy_name;
auto position = root->view()->getMostRecentRootNumberAndTickValue();
std::tie(action, policy_name) =
get_subscription_action(sub.get(), root, position);
if (action == sub_action::drop) {
sub->last_sub_tick = position.ticks;
sub->query->since_spec = std::make_unique<ClockSpec>(position);
log(DBG,
"(flush-subscriptions) dropping subscription notifications for ",
sub->name,
" until state ",
policy_name,
" is vacated. Advanced ticks to ",
sub->last_sub_tick,
"\n");
json_array_append(dropped, w_string_to_json(sub_name_str));
} else {
// flush-subscriptions means that we _should NOT defer_ notifications. So
// ignore defer and defer_vcs.
ClockSpec out_position;
log(DBG,
"(flush-subscriptions) executing subscription ",
sub->name,
"\n");
auto sub_result = sub->buildSubscriptionResults(
root, out_position, OnStateTransition::QueryAnyway);
if (sub_result) {
client->enqueueResponse(std::move(sub_result));
json_array_append(synced, w_string_to_json(sub_name_str));
} else {
json_array_append(no_sync_needed, w_string_to_json(sub_name_str));
}
}
}
resp.set(
{{"synced", std::move(synced)},
{"no_sync_needed", std::move(no_sync_needed)},
{"dropped", std::move(dropped)}});
add_root_warnings_to_response(resp, root);
client->enqueueResponse(std::move(resp));
}