source/server/start.ml (644 lines of code) (raw):
(*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*)
open Core
module ServerEvent = struct
module ErrorKind = struct
type t =
| Watchman
| BuckInternal
| BuckUser
| Pyre
| Unknown
[@@deriving sexp, compare, hash, to_yojson]
end
type t =
| SocketCreated of PyrePath.t
| ServerInitialized
| Exception of string * ErrorKind.t
[@@deriving sexp, compare, hash, to_yojson]
let serialize event = to_yojson event |> Yojson.Safe.to_string
let write ~output_channel event =
let open Lwt.Infix in
serialize event |> Lwt_io.fprintl output_channel >>= fun () -> Lwt_io.flush output_channel
end
module ExitStatus = struct
type t =
| Ok
| Error
[@@deriving sexp, compare, hash]
let exit_code = function
| Ok -> 0
| Error -> 1
end
let with_performance_logging ?(normals = []) ~name f =
let open Lwt.Infix in
let timer = Timer.start () in
f ()
>>= fun result ->
let normals = ("binary_version", Version.version ()) :: normals in
Statistics.performance ~name ~timer ~normals ();
Lwt.return result
module ClientRequest = struct
type t =
| GetInfo
| StopServer
| Request of Request.t
| Subscription of Subscription.Request.t
| Error of string
[@@deriving sexp, compare]
let of_string input_string =
try
let json = Yojson.Safe.from_string input_string in
match json with
| `List [`String "GetInfo"] -> GetInfo
| `List [`String "Stop"] -> StopServer
| _ -> (
match Subscription.Request.of_yojson json with
| Result.Ok subscription -> Subscription subscription
| Result.Error _ -> (
match Request.of_yojson json with
| Result.Ok request -> Request request
| Result.Error _ -> Error "Malformed JSON request"))
with
| Yojson.Json_error message -> Error message
end
let handle_request ~properties ~state request =
let open Lwt.Infix in
let on_uncaught_server_exception exn =
Log.info "Uncaught server exception: %s" (Exn.to_string exn);
let () =
let { ServerProperties.configuration; _ } = properties in
StartupNotification.produce
~log_path:configuration.log_directory
"Restarting Pyre server due to unexpected crash"
in
let origin =
match exn with
| Buck.Raw.BuckError _
| Buck.Interface.JsonError _
| Buck.Builder.LinkTreeConstructionError _ ->
"buck"
| Watchman.ConnectionError _
| Watchman.SubscriptionError _
| Watchman.QueryError _ ->
"watchman"
| _ -> "server"
in
Statistics.log_exception exn ~fatal:true ~origin;
Stop.log_and_stop_waiting_server ~reason:"uncaught exception" ~properties ()
in
Lwt.catch
(fun () ->
Log.log ~section:`Server "Processing request %a..." Sexp.pp (Request.sexp_of_t request);
with_performance_logging
~normals:["request kind", Request.name_of request]
~name:"server request"
(fun () -> RequestHandler.process_request ~properties ~state request))
on_uncaught_server_exception
>>= fun (new_state, response) ->
Log.log ~section:`Server "Request `%a` processed" Sexp.pp (Request.sexp_of_t request);
Lwt.return (new_state, response)
let handle_subscription ~state:{ ServerState.subscriptions; _ } ~output_channel request =
match request with
| Subscription.Request.SubscribeToTypeErrors subscriber_name ->
let subscription = Subscription.create ~name:subscriber_name ~output_channel () in
ServerState.Subscriptions.add subscriptions ~name:subscriber_name ~subscription;
subscription
module ConnectionState = struct
(* Keep track of the subscriptions created from each connection, so when it is closed we could
remove those subscriptions from the server state automatically. *)
type t = { subscription_names: string list }
let create () = { subscription_names = [] }
let add_subscription ~name { subscription_names } =
Log.log ~section:`Server "Subscription added: %s" name;
{ subscription_names = name :: subscription_names }
let cleanup ~server_state:{ ServerState.subscriptions; _ } { subscription_names } =
List.iter subscription_names ~f:(fun name ->
Log.log ~section:`Server "Subscription removed: %s" name;
ServerState.Subscriptions.remove ~name subscriptions)
end
let handle_connection
~server_properties
~server_state
_client_address
(input_channel, output_channel)
=
let open Lwt.Infix in
Log.log ~section:`Server "Connection established";
(* Raw request messages are processed line-by-line. *)
let rec handle_line connection_state =
Lwt_io.read_line_opt input_channel
>>= function
| None ->
Log.log ~section:`Server "Connection closed";
ExclusiveLock.Lazy.write server_state ~f:(fun server_state ->
ConnectionState.cleanup ~server_state connection_state;
Lwt.return (server_state, ()))
| Some message ->
let result =
match ClientRequest.of_string message with
| ClientRequest.Error message -> Lwt.return (connection_state, Response.Error message)
| ClientRequest.GetInfo ->
let response = RequestHandler.create_info_response server_properties in
Lwt.return (connection_state, response)
| ClientRequest.StopServer ->
Stop.log_and_stop_waiting_server
~reason:"explicit request"
~properties:server_properties
()
| ClientRequest.Request request ->
ExclusiveLock.Lazy.write server_state ~f:(fun state ->
handle_request ~properties:server_properties ~state request
>>= fun (new_state, response) ->
Lwt.return (new_state, (connection_state, response)))
| ClientRequest.Subscription subscription ->
ExclusiveLock.Lazy.write server_state ~f:(fun state ->
let subscription = handle_subscription ~state ~output_channel subscription in
(* We send back the initial set of type errors when a subscription first gets
established. *)
handle_request ~properties:server_properties ~state (Request.DisplayTypeError [])
>>= fun (new_state, response) ->
Lwt.return
( new_state,
( ConnectionState.add_subscription
~name:(Subscription.name_of subscription)
connection_state,
response ) ))
in
result
>>= fun (new_connection_state, response) ->
let on_io_exception exn =
Log.log
~section:`Server
"Exception occurred while sending responses: %s"
(Exn.to_string exn);
Lwt.return_unit
in
let raw_response = Yojson.Safe.to_string (Response.to_yojson response) in
Lwt.catch (fun () -> Lwt_io.write_line output_channel raw_response) on_io_exception
>>= fun () -> handle_line new_connection_state
in
ConnectionState.create () |> handle_line
let create_server_properties ~configuration { StartOptions.socket_path; critical_files; _ } =
ServerProperties.create ~socket_path ~critical_files ~configuration ()
let initialize_server_state
?watchman_subscriber
~build_system_initializer
~saved_state_action
({
ServerProperties.configuration = { Configuration.Analysis.log_directory; _ } as configuration;
critical_files;
_;
} as server_properties)
=
(* This is needed to initialize shared memory. *)
let _ = Memory.get_heap_handle configuration in
let start_from_scratch ~build_system () =
Log.info "Initializing server state from scratch...";
let { Service.Check.environment; errors } =
Scheduler.with_scheduler ~configuration ~f:(fun scheduler ->
Service.Check.check
~scheduler
~configuration
~call_graph_builder:(module Analysis.Callgraph.DefaultBuilder))
in
let error_table =
let table = Ast.Reference.Table.create () in
let add_error error =
let key = Analysis.AnalysisError.module_reference error in
Hashtbl.add_multi table ~key ~data:error
in
List.iter errors ~f:add_error;
table
in
ServerState.create ~build_system ~type_environment:environment ~error_table ()
in
let build_and_start_from_scratch ~build_system_initializer () =
let open Lwt.Infix in
BuildSystem.Initializer.run build_system_initializer
>>= fun build_system -> Lwt.return (start_from_scratch ~build_system ())
in
let fetch_saved_state_from_files ~shared_memory_path ~changed_files_path () =
try
let open Pyre in
let changed_files =
changed_files_path
>>| File.create
>>= File.content
>>| String.split_lines
>>| List.map ~f:PyrePath.create_absolute
|> Option.value ~default:[]
in
Lwt.return (Result.Ok { SavedState.Fetched.path = shared_memory_path; changed_files })
with
| exn ->
let message =
let detailed_message =
match exn with
| Watchman.ConnectionError message
| Watchman.QueryError message ->
message
| _ -> Exn.to_string exn
in
Format.sprintf "Cannot fetch saved state from file: %s" detailed_message
in
Lwt.return (Result.Error message)
in
let fetch_saved_state_from_project ~project_name ~project_metadata () =
let open Lwt.Infix in
Lwt.catch
(fun () ->
match watchman_subscriber with
| None -> failwith "Watchman is not enabled"
| Some watchman_subscriber ->
let {
Watchman.Subscriber.Setting.root = watchman_root;
filter = watchman_filter;
raw;
_;
}
=
Watchman.Subscriber.setting_of watchman_subscriber
in
Watchman.Raw.with_connection raw ~f:(fun watchman_connection ->
let target =
PyrePath.create_relative ~root:log_directory ~relative:"new_server/server.state"
in
SavedState.query_and_fetch_exn
{
SavedState.Setting.watchman_root;
watchman_filter;
watchman_connection;
project_name;
project_metadata;
critical_files;
target;
}
>>= fun fetched -> Lwt.return (Result.Ok fetched)))
(fun exn ->
let message =
let detailed_message =
match exn with
| Watchman.ConnectionError message
| Watchman.QueryError message
| SavedState.SavedStateQueryFailure message ->
message
| _ -> Exn.to_string exn
in
Format.sprintf "Cannot fetch saved state from project: %s" detailed_message
in
Lwt.return (Result.Error message))
in
(* Note that this function contains some heuristics: it only attempts to perform cheap checks on
what might affect type checking result. Do *NOT* use it as a general-purpose configuration
comparator. *)
let configuration_equal
{
Configuration.Analysis.analyze_external_sources = left_analyze_external_sources;
filter_directories = left_filter_directories;
ignore_all_errors = left_ignore_all_errors;
source_paths = left_source_paths;
search_paths = left_search_paths;
taint_model_paths = left_taint_model_paths;
strict = left_strict;
excludes = left_excludes;
extensions = left_extensions;
_;
}
{
Configuration.Analysis.analyze_external_sources = right_analyze_external_sources;
filter_directories = right_filter_directories;
ignore_all_errors = right_ignore_all_errors;
source_paths = right_source_paths;
search_paths = right_search_paths;
taint_model_paths = right_taint_model_paths;
strict = right_strict;
excludes = right_excludes;
extensions = right_extensions;
_;
}
=
let list_length_equal left right = Int.equal (List.length left) (List.length right) in
let optional_list_length_equal left right =
match left, right with
| None, None -> true
| Some left, Some right when list_length_equal left right -> true
| _, _ -> false
in
Bool.equal left_analyze_external_sources right_analyze_external_sources
&& optional_list_length_equal left_filter_directories right_filter_directories
&& optional_list_length_equal left_ignore_all_errors right_ignore_all_errors
&& list_length_equal left_source_paths right_source_paths
&& list_length_equal left_search_paths right_search_paths
&& list_length_equal left_taint_model_paths right_taint_model_paths
&& Bool.equal left_strict right_strict
&& list_length_equal left_excludes right_excludes
&& list_length_equal left_extensions right_extensions
in
let load_from_shared_memory path =
try Result.Ok (Memory.load_shared_memory ~path:(PyrePath.absolute path) ~configuration) with
| Memory.SavedStateLoadingFailure message -> Result.Error message
in
let load_from_saved_state ~build_system_initializer = function
| Result.Error message ->
Log.warning "%s" message;
Statistics.event ~name:"saved state failure" ~normals:["reason", message] ();
build_and_start_from_scratch ~build_system_initializer ()
| Result.Ok { SavedState.Fetched.path; changed_files } -> (
Log.info "Restoring environments from saved state...";
match load_from_shared_memory path with
| Result.Error message ->
Log.warning "%s" message;
Statistics.event
~name:"saved state failure"
~normals:["reason", "shared memory loading failure"]
();
Memory.reset_shared_memory ();
build_and_start_from_scratch ~build_system_initializer ()
| Result.Ok () -> (
match configuration_equal configuration (ServerState.load_stored_configuration ()) with
| false ->
(* Although this is a rare occurrence, it *is* possible for the provided
`Configuration.Analysis.t` to be different from what's stored in the saved state
even if the configuration file remained the same. If that happens, we cannot
reuse the saved state as it may lead to a server crash later. *)
Log.warning
"Cannot load saved state due to unexpected configuration change. Falling back to \
cold start...";
Statistics.event
~name:"saved state failure"
~normals:["reason", "configuration change"]
();
Memory.reset_shared_memory ();
build_and_start_from_scratch ~build_system_initializer ()
| true ->
let open Lwt.Infix in
BuildSystem.Initializer.load build_system_initializer
>>= fun build_system ->
let loaded_state = ServerState.load ~build_system () in
Log.info "Processing recent updates not included in saved state...";
Statistics.event ~name:"saved state success" ();
Request.IncrementalUpdate (List.map changed_files ~f:PyrePath.absolute)
|> RequestHandler.process_request ~properties:server_properties ~state:loaded_state
>>= fun (new_state, _) -> Lwt.return new_state))
in
let open Lwt.Infix in
let get_initial_state ~build_system_initializer () =
match saved_state_action with
| Some (SavedStateAction.LoadFromFile { shared_memory_path; changed_files_path }) ->
with_performance_logging
~normals:["initialization method", "saved state"]
~name:"initialization"
(fun _ ->
fetch_saved_state_from_files ~shared_memory_path ~changed_files_path ()
>>= load_from_saved_state ~build_system_initializer)
| Some (SavedStateAction.LoadFromProject { project_name; project_metadata }) ->
let normals =
let normals =
["initialization method", "saved state"; "saved_state_project", project_name]
in
match project_metadata with
| None -> normals
| Some metadata -> ("saved_state_metadata", metadata) :: normals
in
with_performance_logging ~normals ~name:"initialization" (fun _ ->
fetch_saved_state_from_project ~project_name ~project_metadata ()
>>= load_from_saved_state ~build_system_initializer)
| _ ->
with_performance_logging
~normals:["initialization method", "cold start"]
~name:"initialization"
(fun _ -> build_and_start_from_scratch ~build_system_initializer ())
in
let store_initial_state state =
match saved_state_action with
| Some (SavedStateAction.SaveToFile { shared_memory_path }) ->
ServerState.store ~path:shared_memory_path ~configuration state;
Log.info "Initial server state written to %a" PyrePath.pp shared_memory_path
| _ -> ()
in
get_initial_state ~build_system_initializer ()
>>= fun state ->
Log.info "Server state initialized.";
if configuration.debug then
Memory.report_statistics ();
store_initial_state state;
Lwt.return state
let get_watchman_subscriber ?watchman ~watchman_root ~critical_files ~extensions ~source_paths () =
let open Lwt.Infix in
match watchman_root with
| None -> Lwt.return_none
| Some root ->
let get_raw_watchman = function
| Some watchman -> Lwt.return watchman
| None -> Watchman.Raw.create_exn ()
in
get_raw_watchman watchman
>>= fun raw ->
let subscriber_setting =
{
Watchman.Subscriber.Setting.raw;
root;
filter =
Watchman.Filter.from_server_configurations ~critical_files ~extensions ~source_paths ();
}
in
Watchman.Subscriber.subscribe subscriber_setting >>= Lwt.return_some
let on_watchman_update ~server_properties ~server_state paths =
let open Lwt.Infix in
let update_request = Request.IncrementalUpdate (List.map paths ~f:PyrePath.absolute) in
ExclusiveLock.Lazy.write server_state ~f:(fun state ->
handle_request ~properties:server_properties ~state update_request
>>= fun (new_state, _ok_response) ->
(* File watcher does not care about the content of the the response. *)
Lwt.return (new_state, ()))
let with_server
?watchman
?build_system_initializer
~configuration:({ Configuration.Analysis.extensions; _ } as configuration)
~f
({ StartOptions.socket_path; source_paths; watchman_root; critical_files; saved_state_action }
as start_options)
=
let open Lwt in
(* Watchman connection needs to be up before server can start -- otherwise we risk missing
filesystem updates during server establishment. *)
get_watchman_subscriber ?watchman ~watchman_root ~critical_files ~extensions ~source_paths ()
>>= fun watchman_subscriber ->
let build_system_initializer =
match build_system_initializer with
| Some build_system_initializer -> build_system_initializer
| None -> BuildSystem.get_initializer source_paths
in
LwtSocketServer.PreparedSocket.create_from_path socket_path
>>= fun prepared_socket ->
(* We do not want the expensive server initialization to happen before we start to accept client
requests. *)
let server_properties = create_server_properties ~configuration start_options in
let server_state =
ExclusiveLock.Lazy.create (fun () ->
initialize_server_state
?watchman_subscriber
~build_system_initializer
~saved_state_action
server_properties)
in
LwtSocketServer.establish prepared_socket ~f:(handle_connection ~server_properties ~server_state)
>>= fun server ->
let server_waiter () = f (socket_path, server_properties, server_state) in
let server_destructor () =
Log.info "Server is going down. Cleaning up...";
BuildSystem.Initializer.cleanup build_system_initializer
>>= fun () -> LwtSocketServer.shutdown server
in
finalize
(fun () ->
Log.info "Server has started listening on socket `%a`" PyrePath.pp socket_path;
match watchman_subscriber with
| None ->
(* Only wait for the server if we do not have a watchman subscriber. *)
server_waiter ()
| Some subscriber ->
let watchman_waiter =
Watchman.Subscriber.listen
~f:(on_watchman_update ~server_properties ~server_state)
subscriber
>>= fun () ->
(* Lost watchman connection is considered an error. *)
return ExitStatus.Error
in
(* Make sure when the watchman subscriber crashes, the server would go down as well. *)
Lwt.choose [server_waiter (); watchman_waiter])
server_destructor
(* Invoke `on_caught` when given unix signals are received. *)
let wait_on_signals ~on_caught signals =
let open Lwt in
let waiter, resolver = wait () in
List.iter signals ~f:(fun signal ->
let signal = Signal.to_system_int signal in
Lwt_unix.on_signal signal (wakeup resolver) |> ignore);
waiter
>>= fun signal ->
let signal = Signal.of_system_int signal in
Log.info "Server interrupted with signal `%s`" (Signal.to_string signal);
on_caught signal
let start_server
?watchman
?build_system_initializer
?(on_server_socket_ready = fun _ -> Lwt.return_unit)
~on_started
~on_exception
~configuration
start_options
=
let open Lwt in
let f (socket_path, server_properties, server_state) =
on_server_socket_ready socket_path >>= fun _ -> on_started server_properties server_state
in
catch
(fun () -> with_server ?watchman ?build_system_initializer ~configuration start_options ~f)
on_exception
let start_server_and_wait ?event_channel ~configuration start_options =
let open Lwt in
let write_event event =
match event_channel with
| None -> return_unit
| Some output_channel ->
catch
(fun () -> ServerEvent.write ~output_channel event)
(function
| Lwt_io.Channel_closed _
| Caml.Unix.Unix_error (Caml.Unix.EPIPE, _, _) ->
return_unit
| exn -> Lwt.fail exn)
in
start_server
start_options
~configuration
~on_server_socket_ready:(fun socket_path ->
(* An empty message signals that server socket has been created. *)
write_event (ServerEvent.SocketCreated socket_path))
~on_started:(fun { ServerProperties.start_time; _ } server_state ->
ExclusiveLock.Lazy.force server_state
>>= fun _ ->
write_event ServerEvent.ServerInitialized
>>= fun () ->
choose
[
(* We rely on SIGINT for normal server shutdown. *)
wait_on_signals [Signal.int] ~on_caught:(fun _ -> return ExitStatus.Ok);
(* Getting these signals usually indicates something serious went wrong. *)
wait_on_signals
[Signal.abrt; Signal.term; Signal.quit; Signal.segv]
~on_caught:(fun signal ->
Stop.log_stopped_server ~reason:(Signal.to_string signal) ~start_time ();
return ExitStatus.Error);
])
~on_exception:(fun exn ->
let kind, message =
match exn with
| Buck.Raw.BuckError { arguments; description; exit_code; additional_logs } ->
(* Buck exit code >=10 are considered internal:
https://buck.build/command/exit_codes.html *)
let kind =
match exit_code with
| Some exit_code when exit_code < 10 -> ServerEvent.ErrorKind.BuckUser
| _ -> ServerEvent.ErrorKind.BuckInternal
in
let reproduce_message =
if Buck.Raw.ArgumentList.length arguments <= 20 then
[
Format.sprintf
"To reproduce this error, run `%s`."
(Buck.Raw.ArgumentList.to_buck_command arguments);
]
else
[]
in
let additional_messages =
if List.is_empty additional_logs then
[]
else
"Here are the last few lines of Buck log:"
:: " ..." :: List.map additional_logs ~f:(String.( ^ ) " ")
in
( kind,
Format.sprintf
"Cannot build the project: %s.\n%s"
description
(String.concat ~sep:"\n" (List.append reproduce_message additional_messages)) )
| Buck.Interface.JsonError message ->
( ServerEvent.ErrorKind.Pyre,
Format.sprintf
"Cannot build the project because Buck returns malformed JSON: %s"
message )
| Buck.Builder.LinkTreeConstructionError message ->
( ServerEvent.ErrorKind.Pyre,
Format.sprintf
"Cannot build the project because Pyre encounters a fatal error while constructing \
a link tree: %s"
message )
| ChecksumMap.LoadError message ->
( ServerEvent.ErrorKind.Pyre,
Format.sprintf
"Cannot build the project because Pyre encounters a fatal error while loading \
external wheel: %s"
message )
| Watchman.ConnectionError message ->
ServerEvent.ErrorKind.Watchman, Format.sprintf "Watchman connection error: %s" message
| Watchman.SubscriptionError message ->
ServerEvent.ErrorKind.Watchman, Format.sprintf "Watchman subscription error: %s" message
| Watchman.QueryError message ->
ServerEvent.ErrorKind.Watchman, Format.sprintf "Watchman query error: %s" message
| Unix.Unix_error (Unix.EADDRINUSE, _, _) ->
( ServerEvent.ErrorKind.Pyre,
"A Pyre server is already running for the current project. Use `pyre stop` to stop \
it before starting another one." )
| _ -> ServerEvent.ErrorKind.Unknown, Exn.to_string exn
in
Log.info "%s" message;
write_event (ServerEvent.Exception (message, kind)) >>= fun () -> return ExitStatus.Error)