static void cmd_flush_subscriptions()

in watchman/cmds/subscribe.cpp [319:441]


static void cmd_flush_subscriptions(Client* clientbase, const json_ref& args) {
  auto client = (UserClient*)clientbase;

  int sync_timeout;
  json_ref subs(nullptr);

  // TODO: merge this parse and sync logic with the logic in query evaluation
  if (json_array_size(args) == 3) {
    auto& sync_timeout_obj = args.at(2).get("sync_timeout");
    subs = args.at(2).get_default("subscriptions", nullptr);
    if (!sync_timeout_obj.isInt()) {
      client->sendErrorResponse("'sync_timeout' must be an integer");
      return;
    }
    sync_timeout = sync_timeout_obj.asInt();
  } else {
    client->sendErrorResponse(
        "wrong number of arguments to 'flush-subscriptions'");
    return;
  }

  auto root = resolveRoot(client, args);

  std::vector<w_string> subs_to_sync;
  if (subs) {
    if (!subs.isArray()) {
      client->sendErrorResponse(
          "expected 'subscriptions' to be an array of subscription names");
      return;
    }

    for (auto& sub_name : subs.array()) {
      if (!sub_name.isString()) {
        client->sendErrorResponse(
            "expected 'subscriptions' to be an array of subscription names");
        return;
      }

      auto& sub_name_str = json_to_w_string(sub_name);
      auto sub_iter = client->subscriptions.find(sub_name_str);
      if (sub_iter == client->subscriptions.end()) {
        client->sendErrorResponse(
            "this client does not have a subscription named '{}'",
            sub_name_str);
        return;
      }
      auto& sub = sub_iter->second;
      if (sub->root != root) {
        client->sendErrorResponse(
            "subscription '{}' is on root '{}' different from command root "
            "'{}'",
            sub_name_str,
            sub->root->root_path,
            root->root_path);
        return;
      }

      subs_to_sync.push_back(sub_name_str);
    }
  } else {
    // Look for all subscriptions matching this root.
    for (auto& sub_iter : client->subscriptions) {
      if (sub_iter.second->root == root) {
        subs_to_sync.push_back(sub_iter.first);
      }
    }
  }

  root->syncToNow(std::chrono::milliseconds(sync_timeout));

  auto resp = make_response();
  auto synced = json_array();
  auto no_sync_needed = json_array();
  auto dropped = json_array();

  for (auto& sub_name_str : subs_to_sync) {
    auto sub_iter = client->subscriptions.find(sub_name_str);
    auto& sub = sub_iter->second;

    sub_action action;
    w_string policy_name;
    auto position = root->view()->getMostRecentRootNumberAndTickValue();
    std::tie(action, policy_name) =
        get_subscription_action(sub.get(), root, position);

    if (action == sub_action::drop) {
      sub->last_sub_tick = position.ticks;
      sub->query->since_spec = std::make_unique<ClockSpec>(position);
      log(DBG,
          "(flush-subscriptions) dropping subscription notifications for ",
          sub->name,
          " until state ",
          policy_name,
          " is vacated. Advanced ticks to ",
          sub->last_sub_tick,
          "\n");
      json_array_append(dropped, w_string_to_json(sub_name_str));
    } else {
      // flush-subscriptions means that we _should NOT defer_ notifications. So
      // ignore defer and defer_vcs.
      ClockSpec out_position;
      log(DBG,
          "(flush-subscriptions) executing subscription ",
          sub->name,
          "\n");
      auto sub_result = sub->buildSubscriptionResults(
          root, out_position, OnStateTransition::QueryAnyway);
      if (sub_result) {
        client->enqueueResponse(std::move(sub_result));
        json_array_append(synced, w_string_to_json(sub_name_str));
      } else {
        json_array_append(no_sync_needed, w_string_to_json(sub_name_str));
      }
    }
  }

  resp.set(
      {{"synced", std::move(synced)},
       {"no_sync_needed", std::move(no_sync_needed)},
       {"dropped", std::move(dropped)}});
  add_root_warnings_to_response(resp, root);
  client->enqueueResponse(std::move(resp));
}