in watchman/Client.cpp [154:343]
void UserClient::clientThread() noexcept {
status_.transitionTo(ClientStatus::THREAD_STARTED);
// Keep a persistent vector around so that we can avoid allocating
// and releasing heap memory when we collect items from the publisher
std::vector<std::shared_ptr<const watchman::Publisher::Item>> pending;
stm->setNonBlock(true);
w_set_thread_name(
"client=",
unique_id,
":stm=",
uintptr_t(stm.get()),
":pid=",
stm->getPeerProcessID());
client_is_owner = stm->peerIsOwner();
EventPoll pfd[2];
pfd[0].evt = stm->getEvents();
pfd[1].evt = ping.get();
bool client_alive = true;
while (!w_is_stopping() && client_alive) {
// Wait for input from either the client socket or
// via the ping pipe, which signals that some other
// thread wants to unilaterally send data to the client
status_.transitionTo(ClientStatus::WAITING_FOR_REQUEST);
ignore_result(w_poll_events(pfd, 2, 2000));
if (w_is_stopping()) {
break;
}
if (pfd[0].ready) {
status_.transitionTo(ClientStatus::DECODING_REQUEST);
json_error_t jerr;
auto request = reader.decodeNext(stm.get(), &jerr);
if (!request && errno == EAGAIN) {
// That's fine
} else if (!request) {
// Not so cool
if (reader.wpos == reader.rpos) {
// If they disconnected in between PDUs, no need to log
// any error
goto disconnected;
}
sendErrorResponse(
"invalid json at position {}: {}", jerr.position, jerr.text);
logf(ERR, "invalid data from client: {}\n", jerr.text);
goto disconnected;
} else if (request) {
pdu_type = reader.pdu_type;
capabilities = reader.capabilities;
status_.transitionTo(ClientStatus::DISPATCHING_COMMAND);
dispatch_command(this, Command::parse(request), CMD_DAEMON);
}
}
if (pfd[1].ready) {
while (ping->testAndClear()) {
status_.transitionTo(ClientStatus::PROCESSING_SUBSCRIPTION);
// Enqueue refs to pending log payloads
pending.clear();
getPending(pending, debugSub, errorSub);
for (auto& item : pending) {
enqueueResponse(json_ref(item->payload));
}
// Maybe we have subscriptions to dispatch?
std::vector<w_string> subsToDelete;
for (auto& [sub, subStream] : unilateralSub) {
watchman::log(
watchman::DBG, "consider fan out sub ", sub->name, "\n");
pending.clear();
subStream->getPending(pending);
bool seenSettle = false;
for (auto& item : pending) {
auto dumped = json_dumps(item->payload, 0);
watchman::log(
watchman::DBG,
"Unilateral payload for sub ",
sub->name,
" ",
dumped,
"\n");
if (item->payload.get_default("canceled")) {
watchman::log(
watchman::ERR,
"Cancel subscription ",
sub->name,
" due to root cancellation\n");
auto resp = make_response();
resp.set(
{{"root", item->payload.get_default("root")},
{"unilateral", json_true()},
{"canceled", json_true()},
{"subscription", w_string_to_json(sub->name)}});
enqueueResponse(std::move(resp));
// Remember to cancel this subscription.
// We can't do it in this loop because that would
// invalidate the iterators and cause a headache.
subsToDelete.push_back(sub->name);
continue;
}
if (item->payload.get_default("state-enter") ||
item->payload.get_default("state-leave")) {
auto resp = make_response();
json_object_update(item->payload, resp);
// We have the opportunity to populate additional response
// fields here (since we don't want to block the command).
// We don't populate the fat clock for SCM aware queries
// because determination of mergeBase could add latency.
resp.set(
{{"unilateral", json_true()},
{"subscription", w_string_to_json(sub->name)}});
enqueueResponse(std::move(resp));
watchman::log(
watchman::DBG,
"Fan out subscription state change for ",
sub->name,
"\n");
continue;
}
if (!sub->debug_paused && item->payload.get_default("settled")) {
seenSettle = true;
continue;
}
}
if (seenSettle) {
sub->processSubscription();
}
}
for (auto& name : subsToDelete) {
unsubByName(name);
}
}
}
/* now send our response(s) */
while (!responses.empty() && client_alive) {
status_.transitionTo(ClientStatus::SENDING_SUBSCRIPTION_RESPONSES);
auto& response_to_send = responses.front();
stm->setNonBlock(false);
/* Return the data in the same format that was used to ask for it.
* Update client liveness based on send success.
*/
client_alive = writer.pduEncodeToStream(
pdu_type, capabilities, response_to_send, stm.get());
stm->setNonBlock(true);
json_ref subscriptionValue = response_to_send.get_default("subscription");
if (kResponseLogLimit && subscriptionValue &&
subscriptionValue.isString() &&
json_string_value(subscriptionValue)) {
auto subscriptionName = json_to_w_string(subscriptionValue);
if (auto* sub = folly::get_ptr(subscriptions, subscriptionName)) {
if ((*sub)->lastResponses.size() >= kResponseLogLimit) {
(*sub)->lastResponses.pop_front();
}
(*sub)->lastResponses.push_back(ClientSubscription::LoggedResponse{
std::chrono::system_clock::now(), response_to_send});
}
}
responses.pop_front();
}
}
disconnected:
status_.transitionTo(ClientStatus::THREAD_STOPPING);
w_set_thread_name(
"NOT_CONN:client=",
unique_id,
":stm=",
uintptr_t(stm.get()),
":pid=",
stm->getPeerProcessID());
}