source/server/watchman.ml (485 lines of code) (raw):

(* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. *) open Core exception ConnectionError of string exception SubscriptionError of string exception QueryError of string module Raw = struct module Response = struct type t = | Ok of Yojson.Safe.t | EndOfStream | Error of string end module Connection = struct type t = { send: Yojson.Safe.t -> unit Lwt.t; receive: unit -> Response.t Lwt.t; shutdown: unit -> unit Lwt.t; } let send { send; _ } = send let receive { receive; _ } = receive let shutdown { shutdown; _ } = shutdown end type t = { open_connection: unit -> Connection.t Lwt.t } let open_connection { open_connection } = open_connection () let shutdown_connection connection = Connection.shutdown connection () let with_connection ~f { open_connection } = let open Lwt.Infix in open_connection () >>= fun connection -> Lwt.finalize (fun () -> f connection) (Connection.shutdown connection) let create_for_testing ~send ~receive () = let receive () = let open Lwt.Infix in receive () >>= function | Some json -> Lwt.return (Response.Ok json) | None -> Lwt.return Response.EndOfStream in let shutdown () = Lwt.return_unit in let mock_connection = { Connection.send; receive; shutdown } in { open_connection = (fun () -> Lwt.return mock_connection) } let get_watchman_socket_name () = let open Lwt.Infix in LwtSubprocess.run "watchman" ~arguments:["--no-pretty"; "get-sockname"] >>= fun { LwtSubprocess.Completed.status; stdout; stderr } -> match status with | Caml.Unix.WEXITED 0 -> let socket_name = try Yojson.Safe.from_string stdout |> Yojson.Safe.Util.member "sockname" |> Yojson.Safe.Util.to_string with | Yojson.Json_error message -> let message = Format.sprintf "Cannot parse JSON result from watchman getsockname: %s" message in raise (ConnectionError message) in Lwt.return socket_name | WEXITED 127 -> let message = Format.sprintf "Cannot find watchman exectuable under PATH: %s" (Option.value (Sys_utils.getenv_path ()) ~default:"(not set)") in raise (ConnectionError message) | WEXITED code -> let message = Format.sprintf "Watchman exited code %d, stderr = %S" code stderr in raise (ConnectionError message) | WSIGNALED signal -> let message = Format.sprintf "watchman signaled with %s signal" (PrintSignal.string_of_signal signal) in raise (ConnectionError message) | WSTOPPED signal -> let message = Format.sprintf "watchman stopped with %s signal" (PrintSignal.string_of_signal signal) in raise (ConnectionError message) let create_exn () = let open Lwt.Infix in Log.info "Initializing file watching service..."; get_watchman_socket_name () >>= fun socket_name -> let open_connection () = Log.info "Connecting to watchman..."; Lwt_io.open_connection (Lwt_unix.ADDR_UNIX socket_name) >>= fun (input_channel, output_channel) -> Log.info "Established watchman connection."; let send json = Yojson.Safe.to_string json |> Lwt_io.write_line output_channel in let receive () = Lwt_io.read_line_opt input_channel >>= function | None -> Lwt.return Response.EndOfStream | Some line -> ( try let json = Yojson.Safe.from_string line in Lwt.return (Response.Ok json) with | Yojson.Json_error message -> let message = Format.sprintf "Cannot parse JSON from watchman response: %s" message in Lwt.return (Response.Error message)) in let shutdown () = Log.info "Shutting down watchman connection..."; Lwt_io.close input_channel >>= fun () -> Lwt_io.close output_channel in Lwt.return { Connection.send; receive; shutdown } in Lwt.return { open_connection } let create () = let open Lwt.Infix in Lwt.catch (fun () -> create_exn () >>= fun raw -> Lwt.return (Result.Ok raw)) (fun exn -> let message = Format.sprintf "Cannot initialize watchman due to exception: %s" (Exn.to_string exn) in Lwt.return (Result.Error message)) end module Filter = struct type t = { base_names: string list; whole_names: string list; suffixes: string list; } [@@deriving sexp, compare, hash] let from_server_configurations ~critical_files ~extensions ~source_paths () = let base_name_of = function | CriticalFile.BaseName name -> Some name | CriticalFile.FullPath path -> Some (PyrePath.last path) | CriticalFile.Extension _ -> None in let base_names = List.filter_map critical_files ~f:base_name_of |> String.Set.of_list |> fun set -> Set.add set ".pyre_configuration" |> fun set -> Set.add set ".pyre_configuration.local" |> fun set -> (match source_paths with | Configuration.SourcePaths.Buck _ -> let set = Set.add set "TARGETS" in Set.add set "BUCK" | Configuration.SourcePaths.Simple _ | Configuration.SourcePaths.WithUnwatchedDependency _ -> set) |> Set.to_list in let whole_names = match source_paths with | Configuration.SourcePaths.WithUnwatchedDependency { unwatched_dependency = { Configuration.UnwatchedDependency.change_indicator = { Configuration.ChangeIndicator.relative; _ }; _; }; _; } -> (* Change indicator file needs to be watched, since we rely on it to detect changes in unwatched dependencies. *) [relative] | _ -> [] in let extension_of = function | CriticalFile.BaseName _ | CriticalFile.FullPath _ -> (* We do not need to track these files by extensions since they are already tracked by base_names. *) None | CriticalFile.Extension suffix -> Some suffix in let suffixes = List.map ~f:Configuration.Extension.suffix extensions |> List.map ~f:(String.lstrip ~drop:(Char.equal '.')) |> String.Set.of_list |> fun set -> List.filter_map critical_files ~f:extension_of |> List.fold ~init:set ~f:String.Set.add |> fun set -> Set.add set "py" |> fun set -> Set.add set "pyi" |> Set.to_list in { base_names; whole_names; suffixes } let watchman_expression_of { base_names; whole_names; suffixes } = let base_names = List.map base_names ~f:(fun base_name -> `List [`String "match"; `String base_name; `String "basename"]) in let whole_names = List.map whole_names ~f:(fun base_name -> `List [`String "match"; `String base_name; `String "wholename"]) in let suffixes = List.map suffixes ~f:(fun suffix -> `List [`String "suffix"; `String suffix]) in `List [ `String "allof"; `List [`String "type"; `String "f"]; `List (`String "anyof" :: List.concat [suffixes; base_names; whole_names]); ] end module Subscriber = struct module Setting = struct type t = { raw: Raw.t; root: PyrePath.t; filter: Filter.t; } end type t = { setting: Setting.t; connection: Raw.Connection.t; initial_clock: string; } let send_request ~connection request = let open Lwt.Infix in Raw.Connection.send connection request >>= fun () -> Raw.Connection.receive connection () let create_subscribe_request ~root ~filter () = `List [ `String "subscribe"; `String (PyrePath.absolute root); `String "pyre_file_change_subscription"; `Assoc [ "empty_on_fresh_instance", `Bool true; "expression", Filter.watchman_expression_of filter; "fields", `List [`String "name"]; ]; ] let create_watch_project_rquest ~root () = `List [`String "watch-project"; `String (PyrePath.absolute root)] let handle_subscribe_response = function | Raw.Response.Error message -> raise (SubscriptionError message) | Raw.Response.EndOfStream -> raise (SubscriptionError "Cannot get the initial response from `watchman subscribe`") | Raw.Response.Ok initial_response -> ( match Yojson.Safe.Util.member "error" initial_response with | `Null -> ( match Yojson.Safe.Util.member "clock" initial_response with | `String initial_clock -> Lwt.return initial_clock | _ as error -> let message = Format.sprintf "Cannot determinte the initial clock from response %s" (Yojson.Safe.to_string error) in raise (SubscriptionError message)) | _ as error -> let message = Format.sprintf "Subscription rejected by watchman. Response: %s" (Yojson.Safe.to_string error) in raise (SubscriptionError message)) let handle_watch_project_response = function | Raw.Response.Error message -> raise (SubscriptionError message) | Raw.Response.EndOfStream -> raise (SubscriptionError "Cannot get the initial response from `watchman watch-project`") | Raw.Response.Ok initial_response -> ( match Yojson.Safe.Util.member "error" initial_response with | `Null -> Lwt.return_unit | _ as error -> let message = Format.sprintf "Watch-project request rejected by watchman. Response: %s" (Yojson.Safe.to_string error) in raise (SubscriptionError message)) let subscribe ({ Setting.raw; root; filter } as setting) = let open Lwt.Infix in Raw.open_connection raw >>= fun connection -> let do_subscribe () = Log.info "Request watchman subscription at %a" PyrePath.pp root; send_request ~connection (create_watch_project_rquest ~root ()) >>= handle_watch_project_response >>= fun () -> send_request ~connection (create_subscribe_request ~root ~filter ()) >>= handle_subscribe_response >>= fun initial_clock -> Lwt.return { setting; connection; initial_clock } in Lwt.catch do_subscribe (fun exn -> (* Make sure the connection is properly shut down when an exception is raised. *) Raw.shutdown_connection connection >>= fun () -> raise exn) let setting_of { setting; _ } = setting let listen ~f { connection; initial_clock; setting = _ } = let open Lwt.Infix in let rec do_listen () = Raw.Connection.receive connection () >>= function | Raw.Response.Error message -> raise (SubscriptionError message) | Raw.Response.EndOfStream -> Lwt.return_unit | Raw.Response.Ok response -> ( match ( Yojson.Safe.Util.member "is_fresh_instance" response, Yojson.Safe.Util.member "clock" response ) with | `Bool true, `String update_clock when String.equal initial_clock update_clock -> (* This is the initial `is_fresh_instance` message, which can be safely ignored. *) do_listen () | `Bool true, _ -> (* This is not the initial `is_fresh_instance` message, which usually indicates that our current view of the filesystem may not be accurate anymore. *) raise (SubscriptionError "Received `is_fresh_instance` message from watchman") | _, _ -> ( match Yojson.Safe.Util.member "canceled" response with | `Bool true -> (* This means the susbscription is cancelled by watchman. We should not keep going. *) raise (SubscriptionError "Subscription is cancelled by watchman") | _ -> ( let () = match Yojson.Safe.Util.member "warning" response with | `String message -> Log.warning "Received watchman warning: %s" message | _ -> () in match Yojson.Safe.Util.member "files" response with | `Null -> (* This could be a "state-enter"/"state-leave" message which can also be safely ignored. *) do_listen () | files_json -> ( try let root = Yojson.Safe.Util.(member "root" response |> to_string) |> PyrePath.create_absolute in let changed_paths = Yojson.Safe.Util.(convert_each to_string files_json) |> List.map ~f:(fun relative -> PyrePath.create_relative ~root ~relative) in f changed_paths >>= fun () -> do_listen () with | Yojson.Json_error message -> let message = Format.sprintf "Cannot parse JSON result from watchman subscription: %s" message in raise (SubscriptionError message) | Yojson.Safe.Util.Type_error (message, json) | Yojson.Safe.Util.Undefined (message, json) -> let message = Format.sprintf "Unexpected JSON format for watchman subscription: %s. %s." (Yojson.Safe.to_string json) message in raise (SubscriptionError message))))) in Lwt.finalize do_listen (fun () -> Raw.Connection.shutdown connection ()) let with_subscription ~f config = let open Lwt.Infix in subscribe config >>= fun subscriber -> listen ~f subscriber end module SinceQuery = struct module Since = struct module SavedState = struct type t = { storage: string; project_name: string; project_metadata: string option; } [@@deriving sexp, compare, hash] let watchman_expression_of { storage; project_name; project_metadata } = let storage_entry = "storage", `String storage in let configuration_entry = ( "config", let project_name_entry = "project", `String project_name in match project_metadata with | None -> `Assoc [project_name_entry] | Some project_metadata -> let project_metadata_entry = "project-metadata", `String project_metadata in `Assoc [project_name_entry; project_metadata_entry] ) in `Assoc [storage_entry; configuration_entry] end type t = | Clock of string | SourceControlAware of { mergebase_with: string; saved_state: SavedState.t option; } [@@deriving sexp, compare, hash] let watchman_expression_of = function | Clock clock -> `String clock | SourceControlAware { mergebase_with; saved_state } -> `Assoc [ ( "scm", let mergebase_with_entry = "mergebase-with", `String mergebase_with in match saved_state with | None -> `Assoc [mergebase_with_entry] | Some saved_state -> let saved_state_entry = "saved-state", SavedState.watchman_expression_of saved_state in `Assoc [mergebase_with_entry; saved_state_entry] ); ] end module Response = struct module SavedState = struct type t = { bucket: string; path: string; commit_id: string option; } [@@deriving sexp, compare, hash] let of_watchman_response_exn response = let open Yojson.Safe.Util in let bucket = member "manifold-bucket" response |> to_string in let path = member "manifold-path" response |> to_string in let commit_id = match member "commit-id" response with | `String id -> Some id | _ -> None in { bucket; path; commit_id } end type t = { relative_paths: string list; saved_state: SavedState.t option; } [@@deriving sexp, compare, hash] let of_watchman_response_exn response = let open Yojson.Safe.Util in let relative_paths = member "files" response |> to_list |> filter_string in let saved_state = match member "saved-state-info" response with | `Null -> None | _ as response -> SavedState.of_watchman_response_exn response |> Option.some in { relative_paths; saved_state } let of_watchman_response response = try Some (of_watchman_response_exn response) with | _ -> None end type t = { root: PyrePath.t; filter: Filter.t; since: Since.t; } [@@deriving sexp, compare, hash] let watchman_request_of { root; filter; since } = `List [ `String "query"; `String (PyrePath.absolute root); `Assoc [ "fields", `List [`String "name"]; "expression", Filter.watchman_expression_of filter; "since", Since.watchman_expression_of since; ]; ] let query_exn ~connection since_query = let open Lwt.Infix in let request = watchman_request_of since_query in Raw.Connection.send connection request >>= fun () -> Raw.Connection.receive connection () >>= function | Raw.Response.Ok response -> Lwt.return (Response.of_watchman_response_exn response) | Error message -> raise (QueryError message) | EndOfStream -> let message = "Failed to receive any response from watchman server" in raise (QueryError message) let query ~connection since_query = let open Lwt.Infix in Lwt.catch (fun () -> query_exn ~connection since_query >>= fun response -> Lwt.return (Result.Ok response)) (fun exn -> let message = Format.sprintf "Watchman query failed. Exception: %s" (Exn.to_string exn) in Lwt.return (Result.Error message)) end