in watchman/ruby/ruby-watchman/ext/ruby-watchman/watchman.c [552:656]
VALUE RubyWatchman_query(VALUE self, VALUE query, VALUE socket) {
VALUE error = Qnil;
VALUE errorClass = Qnil;
VALUE loaded = Qnil;
char* buffer = NULL;
int fileno = NUM2INT(rb_funcall(socket, rb_intern("fileno"), 0));
// do blocking I/O to simplify the following logic
int flags = fcntl(fileno, F_GETFL);
if (!(flags & O_NONBLOCK) &&
fcntl(fileno, F_SETFL, flags & ~O_NONBLOCK) == -1) {
error = rb_str_new2("unable to clear O_NONBLOCK flag");
goto cleanup;
}
// send the message
VALUE serialized = RubyWatchman_dump(self, query);
long query_len = RSTRING_LEN(serialized);
ssize_t sent = send(fileno, RSTRING_PTR(serialized), query_len, 0);
if (sent == -1) {
goto system_call_fail;
} else if (sent != query_len) {
error = rb_str_new2("sent byte count mismatch");
goto cleanup;
}
// sniff to see how large the header is
int8_t peek[WATCHMAN_PEEK_BUFFER_SIZE];
ssize_t received =
recv(fileno, peek, WATCHMAN_SNIFF_BUFFER_SIZE, MSG_PEEK | MSG_WAITALL);
if (received == -1) {
goto system_call_fail;
} else if (received != WATCHMAN_SNIFF_BUFFER_SIZE) {
error = rb_str_new2("failed to sniff PDU header");
goto cleanup;
}
// peek at size of PDU
int8_t sizes[] = {0, 0, 0, 1, 2, 4, 8};
int8_t sizes_idx = peek[sizeof(WATCHMAN_BINARY_MARKER) - 1];
if (sizes_idx < WATCHMAN_INT8_MARKER || sizes_idx > WATCHMAN_INT64_MARKER) {
error = rb_str_new2("bad PDU size marker");
goto cleanup;
}
ssize_t peek_size =
sizeof(WATCHMAN_BINARY_MARKER) - 1 + sizeof(int8_t) + sizes[sizes_idx];
received = recv(fileno, peek, peek_size, MSG_PEEK);
if (received == -1) {
goto system_call_fail;
} else if (received != peek_size) {
error = rb_str_new2("failed to peek at PDU header");
goto cleanup;
}
int8_t* pdu_size_ptr = peek + sizeof(WATCHMAN_BINARY_MARKER) - sizeof(int8_t);
int64_t payload_size = peek_size +
watchman_load_int((char**)&pdu_size_ptr, (char*)peek + peek_size);
// actually read the PDU
buffer = xmalloc(payload_size);
if (!buffer) {
errorClass = rb_eNoMemError;
error = rb_str_new2("failed to allocate");
goto cleanup;
}
received = recv(fileno, buffer, payload_size, MSG_WAITALL);
if (received == -1) {
goto system_call_fail;
} else if (received != payload_size) {
error = rb_str_new2("failed to load PDU");
goto cleanup;
}
if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
error = rb_str_new2("unable to restore fnctl flags");
goto cleanup;
}
char* payload = buffer + peek_size;
loaded = watchman_load(&payload, payload + payload_size);
goto cleanup;
system_call_fail:
errorClass = rb_eSystemCallError;
error = INT2FIX(errno);
cleanup:
if (buffer) {
xfree(buffer);
}
if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
rb_raise(rb_eRuntimeError, "unable to restore fnctl flags");
}
if (NIL_P(errorClass)) {
errorClass = rb_eRuntimeError;
}
if (!NIL_P(error)) {
rb_exc_raise(rb_class_new_instance(1, &error, errorClass));
}
return loaded;
}