void UserClient::clientThread()

in watchman/Client.cpp [154:343]


void UserClient::clientThread() noexcept {
  status_.transitionTo(ClientStatus::THREAD_STARTED);

  // Keep a persistent vector around so that we can avoid allocating
  // and releasing heap memory when we collect items from the publisher
  std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;

  stm->setNonBlock(true);
  w_set_thread_name(
      "client=",
      unique_id,
      ":stm=",
      uintptr_t(stm.get()),
      ":pid=",
      stm->getPeerProcessID());

  client_is_owner = stm->peerIsOwner();

  EventPoll pfd[2];
  pfd[0].evt = stm->getEvents();
  pfd[1].evt = ping.get();

  bool client_alive = true;
  while (!w_is_stopping() && client_alive) {
    // Wait for input from either the client socket or
    // via the ping pipe, which signals that some other
    // thread wants to unilaterally send data to the client

    status_.transitionTo(ClientStatus::WAITING_FOR_REQUEST);
    ignore_result(w_poll_events(pfd, 2, 2000));
    if (w_is_stopping()) {
      break;
    }

    if (pfd[0].ready) {
      status_.transitionTo(ClientStatus::DECODING_REQUEST);
      json_error_t jerr;
      auto request = reader.decodeNext(stm.get(), &jerr);

      if (!request && errno == EAGAIN) {
        // That's fine
      } else if (!request) {
        // Not so cool
        if (reader.wpos == reader.rpos) {
          // If they disconnected in between PDUs, no need to log
          // any error
          goto disconnected;
        }
        sendErrorResponse(
            "invalid json at position {}: {}", jerr.position, jerr.text);
        logf(ERR, "invalid data from client: {}\n", jerr.text);

        goto disconnected;
      } else if (request) {
        pdu_type = reader.pdu_type;
        capabilities = reader.capabilities;
        status_.transitionTo(ClientStatus::DISPATCHING_COMMAND);
        dispatch_command(this, Command::parse(request), CMD_DAEMON);
      }
    }

    if (pfd[1].ready) {
      while (ping->testAndClear()) {
        status_.transitionTo(ClientStatus::PROCESSING_SUBSCRIPTION);
        // Enqueue refs to pending log payloads
        pending.clear();
        getPending(pending, debugSub, errorSub);
        for (auto& item : pending) {
          enqueueResponse(json_ref(item->payload));
        }

        // Maybe we have subscriptions to dispatch?
        std::vector<w_string> subsToDelete;
        for (auto& [sub, subStream] : unilateralSub) {
          watchman::log(
              watchman::DBG, "consider fan out sub ", sub->name, "\n");

          pending.clear();
          subStream->getPending(pending);
          bool seenSettle = false;
          for (auto& item : pending) {
            auto dumped = json_dumps(item->payload, 0);
            watchman::log(
                watchman::DBG,
                "Unilateral payload for sub ",
                sub->name,
                " ",
                dumped,
                "\n");

            if (item->payload.get_default("canceled")) {
              watchman::log(
                  watchman::ERR,
                  "Cancel subscription ",
                  sub->name,
                  " due to root cancellation\n");

              auto resp = make_response();
              resp.set(
                  {{"root", item->payload.get_default("root")},
                   {"unilateral", json_true()},
                   {"canceled", json_true()},
                   {"subscription", w_string_to_json(sub->name)}});
              enqueueResponse(std::move(resp));
              // Remember to cancel this subscription.
              // We can't do it in this loop because that would
              // invalidate the iterators and cause a headache.
              subsToDelete.push_back(sub->name);
              continue;
            }

            if (item->payload.get_default("state-enter") ||
                item->payload.get_default("state-leave")) {
              auto resp = make_response();
              json_object_update(item->payload, resp);
              // We have the opportunity to populate additional response
              // fields here (since we don't want to block the command).
              // We don't populate the fat clock for SCM aware queries
              // because determination of mergeBase could add latency.
              resp.set(
                  {{"unilateral", json_true()},
                   {"subscription", w_string_to_json(sub->name)}});
              enqueueResponse(std::move(resp));

              watchman::log(
                  watchman::DBG,
                  "Fan out subscription state change for ",
                  sub->name,
                  "\n");
              continue;
            }

            if (!sub->debug_paused && item->payload.get_default("settled")) {
              seenSettle = true;
              continue;
            }
          }

          if (seenSettle) {
            sub->processSubscription();
          }
        }

        for (auto& name : subsToDelete) {
          unsubByName(name);
        }
      }
    }

    /* now send our response(s) */
    while (!responses.empty() && client_alive) {
      status_.transitionTo(ClientStatus::SENDING_SUBSCRIPTION_RESPONSES);
      auto& response_to_send = responses.front();

      stm->setNonBlock(false);
      /* Return the data in the same format that was used to ask for it.
       * Update client liveness based on send success.
       */
      client_alive = writer.pduEncodeToStream(
          pdu_type, capabilities, response_to_send, stm.get());
      stm->setNonBlock(true);

      json_ref subscriptionValue = response_to_send.get_default("subscription");
      if (kResponseLogLimit && subscriptionValue &&
          subscriptionValue.isString() &&
          json_string_value(subscriptionValue)) {
        auto subscriptionName = json_to_w_string(subscriptionValue);
        if (auto* sub = folly::get_ptr(subscriptions, subscriptionName)) {
          if ((*sub)->lastResponses.size() >= kResponseLogLimit) {
            (*sub)->lastResponses.pop_front();
          }
          (*sub)->lastResponses.push_back(ClientSubscription::LoggedResponse{
              std::chrono::system_clock::now(), response_to_send});
        }
      }

      responses.pop_front();
    }
  }

disconnected:
  status_.transitionTo(ClientStatus::THREAD_STOPPING);
  w_set_thread_name(
      "NOT_CONN:client=",
      unique_id,
      ":stm=",
      uintptr_t(stm.get()),
      ":pid=",
      stm->getPeerProcessID());
}