VALUE RubyWatchman_query()

in watchman/ruby/ruby-watchman/ext/ruby-watchman/watchman.c [552:656]


VALUE RubyWatchman_query(VALUE self, VALUE query, VALUE socket) {
  VALUE error = Qnil;
  VALUE errorClass = Qnil;
  VALUE loaded = Qnil;
  char* buffer = NULL;
  int fileno = NUM2INT(rb_funcall(socket, rb_intern("fileno"), 0));

  // do blocking I/O to simplify the following logic
  int flags = fcntl(fileno, F_GETFL);
  if (!(flags & O_NONBLOCK) &&
      fcntl(fileno, F_SETFL, flags & ~O_NONBLOCK) == -1) {
    error = rb_str_new2("unable to clear O_NONBLOCK flag");
    goto cleanup;
  }

  // send the message
  VALUE serialized = RubyWatchman_dump(self, query);
  long query_len = RSTRING_LEN(serialized);
  ssize_t sent = send(fileno, RSTRING_PTR(serialized), query_len, 0);
  if (sent == -1) {
    goto system_call_fail;
  } else if (sent != query_len) {
    error = rb_str_new2("sent byte count mismatch");
    goto cleanup;
  }

  // sniff to see how large the header is
  int8_t peek[WATCHMAN_PEEK_BUFFER_SIZE];
  ssize_t received =
      recv(fileno, peek, WATCHMAN_SNIFF_BUFFER_SIZE, MSG_PEEK | MSG_WAITALL);
  if (received == -1) {
    goto system_call_fail;
  } else if (received != WATCHMAN_SNIFF_BUFFER_SIZE) {
    error = rb_str_new2("failed to sniff PDU header");
    goto cleanup;
  }

  // peek at size of PDU
  int8_t sizes[] = {0, 0, 0, 1, 2, 4, 8};
  int8_t sizes_idx = peek[sizeof(WATCHMAN_BINARY_MARKER) - 1];
  if (sizes_idx < WATCHMAN_INT8_MARKER || sizes_idx > WATCHMAN_INT64_MARKER) {
    error = rb_str_new2("bad PDU size marker");
    goto cleanup;
  }
  ssize_t peek_size =
      sizeof(WATCHMAN_BINARY_MARKER) - 1 + sizeof(int8_t) + sizes[sizes_idx];

  received = recv(fileno, peek, peek_size, MSG_PEEK);
  if (received == -1) {
    goto system_call_fail;
  } else if (received != peek_size) {
    error = rb_str_new2("failed to peek at PDU header");
    goto cleanup;
  }
  int8_t* pdu_size_ptr = peek + sizeof(WATCHMAN_BINARY_MARKER) - sizeof(int8_t);
  int64_t payload_size = peek_size +
      watchman_load_int((char**)&pdu_size_ptr, (char*)peek + peek_size);

  // actually read the PDU
  buffer = xmalloc(payload_size);
  if (!buffer) {
    errorClass = rb_eNoMemError;
    error = rb_str_new2("failed to allocate");
    goto cleanup;
  }
  received = recv(fileno, buffer, payload_size, MSG_WAITALL);
  if (received == -1) {
    goto system_call_fail;
  } else if (received != payload_size) {
    error = rb_str_new2("failed to load PDU");
    goto cleanup;
  }

  if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
    error = rb_str_new2("unable to restore fnctl flags");
    goto cleanup;
  }

  char* payload = buffer + peek_size;
  loaded = watchman_load(&payload, payload + payload_size);
  goto cleanup;

system_call_fail:
  errorClass = rb_eSystemCallError;
  error = INT2FIX(errno);

cleanup:
  if (buffer) {
    xfree(buffer);
  }

  if (!(flags & O_NONBLOCK) && fcntl(fileno, F_SETFL, flags) == -1) {
    rb_raise(rb_eRuntimeError, "unable to restore fnctl flags");
  }

  if (NIL_P(errorClass)) {
    errorClass = rb_eRuntimeError;
  }

  if (!NIL_P(error)) {
    rb_exc_raise(rb_class_new_instance(1, &error, errorClass));
  }

  return loaded;
}