static void cmd_subscribe()

in watchman/cmds/subscribe.cpp [482:623]


static void cmd_subscribe(Client* clientbase, const json_ref& args) {
  std::shared_ptr<ClientSubscription> sub;
  json_ref resp, initial_subscription_results;
  json_ref jfield_list;
  json_ref jname;
  std::shared_ptr<Query> query;
  json_ref query_spec;
  json_ref defer_list;
  json_ref drop_list;
  UserClient* client = (UserClient*)clientbase;

  if (json_array_size(args) != 4) {
    client->sendErrorResponse("wrong number of arguments for subscribe");
    return;
  }

  auto root = resolveRoot(client, args);

  jname = args.at(2);
  if (!jname.isString()) {
    client->sendErrorResponse("expected 2nd parameter to be subscription name");
    return;
  }

  query_spec = args.at(3);

  query = parseQuery(root, query_spec);
  query->clientPid = client->stm ? client->stm->getPeerProcessID() : 0;
  query->subscriptionName = json_to_w_string(jname);

  defer_list = query_spec.get_default("defer");
  if (defer_list && !defer_list.isArray()) {
    client->sendErrorResponse("defer field must be an array of strings");
    return;
  }

  drop_list = query_spec.get_default("drop");
  if (drop_list && !drop_list.isArray()) {
    client->sendErrorResponse("drop field must be an array of strings");
    return;
  }

  sub = std::make_shared<ClientSubscription>(root, client->shared_from_this());

  sub->name = json_to_w_string(jname);
  sub->query = query;

  auto defer = query_spec.get_default("defer_vcs", json_true());
  if (!defer.isBool()) {
    client->sendErrorResponse("defer_vcs must be boolean");
    return;
  }
  sub->vcs_defer = defer.asBool();

  if (drop_list || defer_list) {
    size_t i;

    if (defer_list) {
      for (i = 0; i < json_array_size(defer_list); i++) {
        sub->drop_or_defer[json_to_w_string(json_array_get(defer_list, i))] =
            false;
      }
    }
    if (drop_list) {
      for (i = 0; i < json_array_size(drop_list); i++) {
        sub->drop_or_defer[json_to_w_string(json_array_get(drop_list, i))] =
            true;
      }
    }
  }

  // If they want SCM aware results we should wait for SCM events to finish
  // before dispatching subscriptions
  if (query->since_spec && query->since_spec->hasScmParams()) {
    sub->vcs_defer = true;

    // If they didn't specify any drop/defer behavior, default to a reasonable
    // setting that works together with the fsmonitor extension for hg.
    if (mapContainsAny(sub->drop_or_defer, "hg.update", "hg.transaction")) {
      sub->drop_or_defer["hg.update"] = false; // defer
      sub->drop_or_defer["hg.transaction"] = false; // defer
    }
  }

  // Connect the root to our subscription
  {
    auto client_id = w_string::build(client->unique_id);
    auto client_stream = w_string::build(fmt::ptr(client->stm.get()));
    auto info_json = json_object(
        {{"name", w_string_to_json(sub->name)},
         {"query", sub->query->query_spec},
         {"client", w_string_to_json(client_id)},
         {"stm", w_string_to_json(client_stream)},
         {"is_owner", json_boolean(client->stm->peerIsOwner())},
         {"pid", json_integer(client->stm->getPeerProcessID())}});

    std::weak_ptr<Client> clientRef(client->shared_from_this());
    client->unilateralSub.insert(std::make_pair(
        sub,
        root->unilateralResponses->subscribe(
            [clientRef, sub]() {
              auto client = clientRef.lock();
              if (client) {
                client->ping->notify();
              }
            },
            info_json)));
  }

  client->subscriptions[sub->name] = sub;

  resp = make_response();
  resp.set("subscribe", json_ref(jname));

  add_root_warnings_to_response(resp, root);
  ClockSpec position;
  initial_subscription_results = sub->buildSubscriptionResults(
      root, position, OnStateTransition::DontAdvance);
  resp.set("clock", position.toJson());
  auto saved_state_info =
      initial_subscription_results.get_default("saved-state-info");
  if (saved_state_info) {
    resp.set("saved-state-info", std::move(saved_state_info));
  }

  auto asserted_states = json_array();
  {
    auto rootAssertedStates = root->assertedStates.rlock();
    for (const auto& key : sub->drop_or_defer) {
      if (rootAssertedStates->isStateAsserted(key.first)) {
        // Not sure what to do in case of failure here. -jupi
        json_array_append(asserted_states, w_string_to_json(key.first));
      }
    }
  }
  resp.set("asserted-states", json_ref(asserted_states));

  client->enqueueResponse(std::move(resp));
  if (initial_subscription_results) {
    client->enqueueResponse(std::move(initial_subscription_results));
  }
}