watchman/PDU.cpp (504 lines of code) (raw):
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
#include "watchman/PDU.h"
#include <folly/Range.h>
#include <folly/String.h>
#include "watchman/CommandRegistry.h"
#include "watchman/Constants.h"
#include "watchman/Logging.h"
#include "watchman/bser.h"
#include "watchman/watchman_stream.h"
namespace watchman {
W_CAP_REG("bser-v2")
PduBuffer::PduBuffer()
: buf((char*)malloc(WATCHMAN_IO_BUF_SIZE)),
allocd(WATCHMAN_IO_BUF_SIZE),
rpos(0),
wpos(0),
pdu_type(need_data),
capabilities(0) {
if (!buf) {
throw std::bad_alloc();
}
}
void PduBuffer::clear() {
wpos = 0;
rpos = 0;
}
PduBuffer::~PduBuffer() {
free(buf);
}
// Shunt down, return available size
uint32_t PduBuffer::shuntDown() {
if (rpos && rpos == wpos) {
rpos = 0;
wpos = 0;
}
if (rpos && rpos < wpos) {
memmove(buf, buf + rpos, wpos - rpos);
wpos -= rpos;
rpos = 0;
}
return allocd - wpos;
}
bool PduBuffer::fillBuffer(watchman_stream* stm) {
uint32_t avail;
int r;
avail = shuntDown();
// Get some more space if we need it
if (avail == 0) {
char* newBuf = (char*)realloc(buf, allocd * 2);
if (!newBuf) {
return false;
}
buf = newBuf;
allocd *= 2;
avail = allocd - wpos;
}
errno = 0;
r = stm->read(buf + wpos, avail);
if (r <= 0) {
return false;
}
wpos += r;
return true;
}
inline PduType PduBuffer::detectPdu() {
if (wpos - rpos < 2) {
return need_data;
}
if (memcmp(buf + rpos, BSER_MAGIC, 2) == 0) {
return is_bser;
}
if (memcmp(buf + rpos, BSER_V2_MAGIC, 2) == 0) {
return is_bser_v2;
}
return is_json_compact;
}
json_ref PduBuffer::readJsonPrettyPdu(
watchman_stream* stm,
json_error_t* jerr) {
char* nl;
int r;
json_ref res;
// Assume newline is at the end of what we have
nl = buf + wpos;
r = (int)(nl - (buf + rpos));
res = json_loadb(buf + rpos, r, 0, jerr);
while (!res) {
// Maybe we can fill more data into the buffer and retry?
if (!fillBuffer(stm)) {
// No, then error is terminal
return nullptr;
}
// Recompute end of buffer
nl = buf + wpos;
r = (int)(nl - (buf + rpos));
// And try parsing this
res = json_loadb(buf + rpos, r, 0, jerr);
}
// update read pos to look beyond this point
rpos += r + 1;
return res;
}
json_ref PduBuffer::readJsonPdu(watchman_stream* stm, json_error_t* jerr) {
int r;
/* look for a newline; that indicates the end of
* a json packet */
auto nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
// If we don't have a newline, we need to fill the
// buffer
while (!nl) {
if (!fillBuffer(stm)) {
if (errno == 0 && stm == w_stm_stdin()) {
// Ugly-ish hack to support the -j CLI option. This allows
// us to consume a JSON input that doesn't end with a newline.
// We only allow this on EOF when reading from stdin
nl = buf + wpos;
break;
}
return nullptr;
}
nl = (char*)memchr(buf + rpos, '\n', wpos - rpos);
}
// buflen
r = (int)(nl - (buf + rpos));
auto res = json_loadb(buf + rpos, r, 0, jerr);
// update read pos to look beyond this point
rpos += r + 1;
return res;
}
bool PduBuffer::decodePduInfo(
watchman_stream* stm,
uint32_t bser_version,
json_int_t* len,
json_int_t* bser_capabilities,
json_error_t* jerr) {
json_int_t needed;
if (bser_version == 2) {
uint32_t capabilities;
while (wpos - rpos < sizeof(capabilities)) {
if (!fillBuffer(stm)) {
snprintf(jerr->text, sizeof(jerr->text), "unable to fill buffer");
return false;
}
}
// json_int_t is architecture-dependent, so go through the uint32_t for
// safety.
memcpy(&capabilities, buf + rpos, sizeof(capabilities));
*bser_capabilities = capabilities;
rpos += sizeof(capabilities);
}
while (!bunser_int(buf + rpos, wpos - rpos, &needed, len)) {
if (needed == -1) {
snprintf(jerr->text, sizeof(jerr->text), "failed to read PDU size");
return false;
}
if (!fillBuffer(stm)) {
snprintf(jerr->text, sizeof(jerr->text), "unable to fill buffer");
return false;
}
}
rpos += (uint32_t)needed;
return true;
}
json_ref PduBuffer::readBserPdu(
watchman_stream* stm,
uint32_t bser_version,
json_error_t* jerr) {
json_int_t needed;
json_int_t val;
json_int_t bser_capabilities;
uint32_t ideal;
int r;
json_ref obj;
rpos += 2;
// We don't handle EAGAIN cleanly in here
stm->setNonBlock(false);
if (!decodePduInfo(stm, bser_version, &val, &bser_capabilities, jerr)) {
return nullptr;
}
// val tells us exactly how much storage we need for this PDU
if (val > allocd - wpos) {
ideal = allocd;
while ((ideal - wpos) < (uint32_t)val) {
ideal *= 2;
}
if (ideal > allocd) {
auto newBuf = (char*)realloc(buf, ideal);
if (!newBuf) {
snprintf(
jerr->text,
sizeof(jerr->text),
"out of memory while allocating %" PRIu32 " bytes",
ideal);
return nullptr;
}
buf = newBuf;
allocd = ideal;
}
}
// We have enough room for the whole thing, let's read it in
while ((wpos - rpos) < val) {
r = stm->read(buf + wpos, allocd - wpos);
if (r <= 0) {
jerr->position = wpos - rpos;
snprintf(
jerr->text,
sizeof(jerr->text),
"error reading %" PRIu32 " bytes val=%" PRIu64 " wpos=%" PRIu32
" rpos=%" PRIu32 " for PDU: %s",
uint32_t(allocd - wpos),
int64_t(val),
wpos,
rpos,
folly::errnoStr(errno).c_str());
return nullptr;
}
wpos += r;
}
obj = bunser(buf + rpos, buf + wpos, &needed, jerr);
if (!obj) {
// obj is a nullptr because deserialization failed. Log the message that
// failed to deserialize to stderr
logf(
ERR,
"decoding BSER failed. The first KB of the hex representation of "
"message follows:\n{:.1024}\n",
folly::hexlify(folly::ByteRange{
reinterpret_cast<const unsigned char*>(buf + rpos), wpos - rpos}));
}
// Ensure that we move the read position to the wpos; we consumed it all
rpos = wpos;
stm->setNonBlock(true);
return obj;
}
bool PduBuffer::readAndDetectPdu(watchman_stream* stm, json_error_t* jerr) {
PduType pdu;
// The client might send us different kinds of PDUs over the same connection,
// so reset the capabilities.
capabilities = 0;
shuntDown();
pdu = detectPdu();
if (pdu == need_data) {
if (!fillBuffer(stm)) {
if (errno != EAGAIN) {
snprintf(
jerr->text,
sizeof(jerr->text),
"fill_buffer: %s",
errno ? folly::errnoStr(errno).c_str() : "EOF");
}
return false;
}
pdu = detectPdu();
}
if (pdu == is_bser_v2) {
// read capabilities (since we haven't increased rpos, first two bytes are
// still the header)
while (wpos - rpos < 2 + sizeof(capabilities)) {
if (!fillBuffer(stm)) {
if (errno != EAGAIN) {
snprintf(
jerr->text,
sizeof(jerr->text),
"fillBuffer: %s",
errno ? folly::errnoStr(errno).c_str() : "EOF");
}
return false;
}
}
// Copy the capabilities over. BSER is system-endian so this is safe.
memcpy(&capabilities, buf + rpos + 2, sizeof(capabilities));
}
if (pdu == is_json_compact && stm == w_stm_stdin()) {
// Minor hack for the `-j` option for reading pretty printed
// json from stdin
pdu = is_json_pretty;
}
pdu_type = pdu;
return true;
}
static bool output_bytes(const char* buf, int x) {
auto& stm = FileDescriptor::stdOut();
while (x > 0) {
auto res = stm.write(buf, x);
if (res.hasError()) {
errno = res.error().value();
#ifdef _WIN32
// TODO: propagate Result<int, std::error_code> as return type
errno = map_win32_err(errno);
#endif
return false;
}
auto len = res.value();
buf += len;
x -= len;
}
return true;
}
bool PduBuffer::streamUntilNewLine(watchman_stream* stm) {
int x;
char* localBuf;
bool is_done = false;
while (true) {
localBuf = buf + rpos;
auto nl = (char*)memchr(localBuf, '\n', wpos - rpos);
if (nl) {
x = 1 + (int)(nl - localBuf);
is_done = true;
} else {
x = wpos - rpos;
}
if (!output_bytes(localBuf, x)) {
return false;
}
localBuf += x;
rpos += x;
if (is_done) {
break;
}
if (!fillBuffer(stm)) {
break;
}
}
return true;
}
bool PduBuffer::streamN(
watchman_stream* stm,
json_int_t len,
json_error_t* jerr) {
uint32_t total = 0;
if (!output_bytes(buf, rpos)) {
snprintf(
jerr->text,
sizeof(jerr->text),
"failed output headers bytes %d: %s\n",
rpos,
folly::errnoStr(errno).c_str());
return false;
}
while (len > 0) {
uint32_t avail = wpos - rpos;
int r;
if (avail) {
if (!output_bytes(buf + rpos, avail)) {
snprintf(
jerr->text,
sizeof(jerr->text),
"output_bytes: avail=%d, failed %s\n",
avail,
folly::errnoStr(errno).c_str());
return false;
}
rpos += avail;
len -= avail;
if (len == 0) {
return true;
}
}
avail = std::min((uint32_t)len, shuntDown());
r = stm->read(buf + wpos, avail);
if (r <= 0) {
snprintf(
jerr->text,
sizeof(jerr->text),
"read: len=%" PRIi64 " wanted %" PRIu32 " got %d %s\n",
(int64_t)len,
avail,
r,
folly::errnoStr(errno).c_str());
return false;
}
wpos += r;
total += r;
}
return true;
}
bool PduBuffer::streamPdu(watchman_stream* stm, json_error_t* jerr) {
uint32_t bser_version = 1;
json_int_t bser_capabilities;
json_int_t len;
switch (pdu_type) {
case is_json_compact:
case is_json_pretty:
return streamUntilNewLine(stm);
case is_bser:
case is_bser_v2: {
if (pdu_type == is_bser_v2) {
bser_version = 2;
} else {
bser_version = 1;
}
rpos += 2;
if (!decodePduInfo(stm, bser_version, &len, &bser_capabilities, jerr)) {
return false;
}
return streamN(stm, len, jerr);
}
default:
logf(FATAL, "not streaming for pdu type {}\n", pdu_type);
return false;
}
}
json_ref PduBuffer::decodePdu(watchman_stream* stm, json_error_t* jerr) {
switch (pdu_type) {
case is_json_compact:
return readJsonPdu(stm, jerr);
case is_json_pretty:
return readJsonPrettyPdu(stm, jerr);
case is_bser_v2:
return readBserPdu(stm, 2, jerr);
default: // bser v1
return readBserPdu(stm, 1, jerr);
}
}
bool PduBuffer::passThru(
PduType output_pdu,
uint32_t output_capabilities,
PduBuffer* output_pdu_buf,
watchman_stream* stm) {
json_error_t jerr;
bool res;
stm->setNonBlock(false);
if (!readAndDetectPdu(stm, &jerr)) {
logf(ERR, "failed to identify PDU: {}\n", jerr.text);
return false;
}
if (pdu_type == output_pdu) {
// We can stream it through
if (!streamPdu(stm, &jerr)) {
logf(ERR, "stream_pdu: {}\n", jerr.text);
return false;
}
return true;
}
auto j = decodePdu(stm, &jerr);
if (!j) {
logf(ERR, "failed to parse response: {}\n", jerr.text);
return false;
}
output_pdu_buf->clear();
res = output_pdu_buf->pduEncodeToStream(
output_pdu, output_capabilities, j, w_stm_stdout());
return res;
}
json_ref PduBuffer::decodeNext(watchman_stream* stm, json_error_t* jerr) {
*jerr = json_error_t();
if (!readAndDetectPdu(stm, jerr)) {
return nullptr;
}
return decodePdu(stm, jerr);
}
struct jbuffer_write_data {
watchman_stream* stm;
PduBuffer* jr;
bool flush() {
int x;
while (jr->wpos - jr->rpos) {
x = stm->write(jr->buf + jr->rpos, jr->wpos - jr->rpos);
if (x <= 0) {
return false;
}
jr->rpos += x;
}
jr->clear();
return true;
}
static int write(const char* buffer, size_t size, void* ptr) {
auto data = (jbuffer_write_data*)ptr;
return data->write(buffer, size);
}
int write(const char* buffer, size_t size) {
while (size) {
// Accumulate in the buffer
int room = jr->allocd - jr->wpos;
// No room? send it over the wire
if (!room) {
if (!flush()) {
return -1;
}
room = jr->allocd - jr->wpos;
}
if ((int)size < room) {
room = (int)size;
}
// Stick it in the buffer
memcpy(jr->buf + jr->wpos, buffer, room);
buffer += room;
size -= room;
jr->wpos += room;
}
return 0;
}
};
bool PduBuffer::bserEncodeToStream(
uint32_t bser_version,
uint32_t bser_capabilities,
const json_ref& json,
watchman_stream* stm) {
struct jbuffer_write_data data = {stm, this};
int res;
res = w_bser_write_pdu(
bser_version, bser_capabilities, jbuffer_write_data::write, json, &data);
if (res != 0) {
return false;
}
return data.flush();
}
bool PduBuffer::jsonEncodeToStream(
const json_ref& json,
watchman_stream* stm,
int flags) {
struct jbuffer_write_data data = {stm, this};
int res;
res = json_dump_callback(json, jbuffer_write_data::write, &data, flags);
if (res != 0) {
return false;
}
if (data.write("\n", 1) != 0) {
return false;
}
return data.flush();
}
bool PduBuffer::pduEncodeToStream(
PduType pdu_type,
uint32_t capabilities,
const json_ref& json,
watchman_stream* stm) {
switch (pdu_type) {
case is_json_compact:
return jsonEncodeToStream(json, stm, JSON_COMPACT);
case is_json_pretty:
return jsonEncodeToStream(json, stm, JSON_INDENT(4));
case is_bser:
return bserEncodeToStream(1, capabilities, json, stm);
case is_bser_v2:
return bserEncodeToStream(2, capabilities, json, stm);
case need_data:
default:
return false;
}
}
/* vim:ts=2:sw=2:et:
*/
} // namespace watchman