in watchman/cmds/subscribe.cpp [482:623]
static void cmd_subscribe(Client* clientbase, const json_ref& args) {
std::shared_ptr<ClientSubscription> sub;
json_ref resp, initial_subscription_results;
json_ref jfield_list;
json_ref jname;
std::shared_ptr<Query> query;
json_ref query_spec;
json_ref defer_list;
json_ref drop_list;
UserClient* client = (UserClient*)clientbase;
if (json_array_size(args) != 4) {
client->sendErrorResponse("wrong number of arguments for subscribe");
return;
}
auto root = resolveRoot(client, args);
jname = args.at(2);
if (!jname.isString()) {
client->sendErrorResponse("expected 2nd parameter to be subscription name");
return;
}
query_spec = args.at(3);
query = parseQuery(root, query_spec);
query->clientPid = client->stm ? client->stm->getPeerProcessID() : 0;
query->subscriptionName = json_to_w_string(jname);
defer_list = query_spec.get_default("defer");
if (defer_list && !defer_list.isArray()) {
client->sendErrorResponse("defer field must be an array of strings");
return;
}
drop_list = query_spec.get_default("drop");
if (drop_list && !drop_list.isArray()) {
client->sendErrorResponse("drop field must be an array of strings");
return;
}
sub = std::make_shared<ClientSubscription>(root, client->shared_from_this());
sub->name = json_to_w_string(jname);
sub->query = query;
auto defer = query_spec.get_default("defer_vcs", json_true());
if (!defer.isBool()) {
client->sendErrorResponse("defer_vcs must be boolean");
return;
}
sub->vcs_defer = defer.asBool();
if (drop_list || defer_list) {
size_t i;
if (defer_list) {
for (i = 0; i < json_array_size(defer_list); i++) {
sub->drop_or_defer[json_to_w_string(json_array_get(defer_list, i))] =
false;
}
}
if (drop_list) {
for (i = 0; i < json_array_size(drop_list); i++) {
sub->drop_or_defer[json_to_w_string(json_array_get(drop_list, i))] =
true;
}
}
}
// If they want SCM aware results we should wait for SCM events to finish
// before dispatching subscriptions
if (query->since_spec && query->since_spec->hasScmParams()) {
sub->vcs_defer = true;
// If they didn't specify any drop/defer behavior, default to a reasonable
// setting that works together with the fsmonitor extension for hg.
if (mapContainsAny(sub->drop_or_defer, "hg.update", "hg.transaction")) {
sub->drop_or_defer["hg.update"] = false; // defer
sub->drop_or_defer["hg.transaction"] = false; // defer
}
}
// Connect the root to our subscription
{
auto client_id = w_string::build(client->unique_id);
auto client_stream = w_string::build(fmt::ptr(client->stm.get()));
auto info_json = json_object(
{{"name", w_string_to_json(sub->name)},
{"query", sub->query->query_spec},
{"client", w_string_to_json(client_id)},
{"stm", w_string_to_json(client_stream)},
{"is_owner", json_boolean(client->stm->peerIsOwner())},
{"pid", json_integer(client->stm->getPeerProcessID())}});
std::weak_ptr<Client> clientRef(client->shared_from_this());
client->unilateralSub.insert(std::make_pair(
sub,
root->unilateralResponses->subscribe(
[clientRef, sub]() {
auto client = clientRef.lock();
if (client) {
client->ping->notify();
}
},
info_json)));
}
client->subscriptions[sub->name] = sub;
resp = make_response();
resp.set("subscribe", json_ref(jname));
add_root_warnings_to_response(resp, root);
ClockSpec position;
initial_subscription_results = sub->buildSubscriptionResults(
root, position, OnStateTransition::DontAdvance);
resp.set("clock", position.toJson());
auto saved_state_info =
initial_subscription_results.get_default("saved-state-info");
if (saved_state_info) {
resp.set("saved-state-info", std::move(saved_state_info));
}
auto asserted_states = json_array();
{
auto rootAssertedStates = root->assertedStates.rlock();
for (const auto& key : sub->drop_or_defer) {
if (rootAssertedStates->isStateAsserted(key.first)) {
// Not sure what to do in case of failure here. -jupi
json_array_append(asserted_states, w_string_to_json(key.first));
}
}
}
resp.set("asserted-states", json_ref(asserted_states));
client->enqueueResponse(std::move(resp));
if (initial_subscription_results) {
client->enqueueResponse(std::move(initial_subscription_results));
}
}