client/commands/server_event.py (113 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import dataclasses
import enum
import json
from pathlib import Path
from typing import List, Optional, Union, IO
from . import async_server_connection
@dataclasses.dataclass
class SocketCreated:
socket_path: Path
@dataclasses.dataclass
class ServerInitialized:
pass
class ErrorKind(enum.Enum):
WATCHMAN = "Watchman"
BUCK_INTERNAL = "BuckInternal"
BUCK_USER = "BuckUser"
PYRE = "Pyre"
UNKNOWN = "Unknown"
def __str__(self) -> str:
return self.value
@staticmethod
def from_string(input_string: str) -> "ErrorKind":
for item in ErrorKind:
if input_string == str(item):
return item
return ErrorKind.UNKNOWN
@dataclasses.dataclass
class ServerException:
message: str
kind: ErrorKind = ErrorKind.UNKNOWN
Event = Union[SocketCreated, ServerInitialized, ServerException]
def create_from_string(input_string: str) -> Optional[Event]:
try:
input_json: List[str] = json.loads(input_string)
if len(input_json) < 1:
return None
input_kind = input_json[0]
if input_kind == "SocketCreated":
if len(input_json) < 2:
return None
else:
return SocketCreated(socket_path=Path(input_json[1]))
elif input_kind == "ServerInitialized":
return ServerInitialized()
elif input_kind == "Exception":
if len(input_json) < 2:
return None
if not isinstance(input_json[1], str):
return None
if (
len(input_json) >= 3
and isinstance(input_json[2], list)
and len(input_json[2]) > 0
and isinstance(input_json[2][0], str)
):
return ServerException(
message=input_json[1], kind=ErrorKind.from_string(input_json[2][0])
)
return ServerException(message=input_json[1], kind=ErrorKind.UNKNOWN)
else:
return None
except json.JSONDecodeError:
return None
class EventParsingException(Exception):
pass
class ServerStartException(Exception):
kind: ErrorKind
def __init__(self, exception_event: ServerException) -> None:
super().__init__(exception_event.message)
self.kind = exception_event.kind
def _parse_server_event(event_string: str) -> Event:
event = create_from_string(event_string)
if event is None:
raise EventParsingException(
f"Unrecognized status update from server: {event_string}"
)
elif isinstance(event, ServerException):
raise ServerStartException(event)
return event
class Waiter:
wait_on_initialization: bool
def __init__(self, wait_on_initialization: bool) -> None:
self.wait_on_initialization = wait_on_initialization
def wait_on(self, event_stream: IO[str]) -> None:
"""
Read from the given input channel, expecting server events there.
If `self.wait_on_initialization` is false, block until server socket
creation and returns.
Otherwise, block until server initialization has finished and returns.
If data obtained from the input channel does not conform to the server
event format, raise `EventParsingException`. If an error event is received,
raise `ServerStartException`.
"""
# The first event is expected to be socket creation
initial_event = _parse_server_event(event_stream.readline().strip())
if isinstance(initial_event, SocketCreated):
if not self.wait_on_initialization:
return
# The second event is expected to be server initialization
second_event = _parse_server_event(event_stream.readline().strip())
if isinstance(second_event, ServerInitialized):
return
raise EventParsingException(
f"Unexpected second server status update: {second_event}"
)
raise EventParsingException(
f"Unexpected initial server status update: {initial_event}"
)
# This method does the same thing as `wait_on` except it operates on asyncio
# streams rather than synchronoized streams.
# NOTE: Any changes inside `wait_on` need to be applied here as well.
async def async_wait_on(
self, event_stream: async_server_connection.TextReader
) -> None:
"""
This method is the same as `wait_on`, except it operates on async input
channels instead of synchronous ones.
"""
initial_event = _parse_server_event((await event_stream.readline()).strip())
if isinstance(initial_event, SocketCreated):
if not self.wait_on_initialization:
return
second_event = _parse_server_event((await event_stream.readline()).strip())
if isinstance(second_event, ServerInitialized):
return
raise EventParsingException(
f"Unexpected second server status update: {second_event}"
)
raise EventParsingException(
f"Unexpected initial server status update: {initial_event}"
)