pyredex/buck.py (130 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # pyre-strict import json import logging import os import platform import typing from io import TextIOWrapper # Data class for a running step. class _RunningPart: def __init__(self, event_id: int, name: str, desc: str) -> None: self.event_id = event_id self.name = name self.desc = desc # Handle connection to buck. Manage running parts (assumed to have stack discipline). class BuckConnection: EVENT_TYPE_STEP = '{"eventType": "STEP_EVENT"}' def __init__(self) -> None: self.has_buck: typing.Optional[bool] = None self.action_id = "" self.event_id = 0 self.event_pipe: typing.Optional[TextIOWrapper] = None self.running_parts: typing.List[_RunningPart] = [] def connect(self) -> None: assert self.has_buck is None if ( "BUCK_EVENT_PIPE" not in os.environ or "BUCK_EVENT_PIPE" not in os.environ or "BUCK_ACTION_ID" not in os.environ # TODO(T103482589) Work around an issue on macs. or platform.system() == "Darwin" ): self.has_buck = False return self.has_buck = True self.action_id = os.environ["BUCK_ACTION_ID"] try: self.__open_pipe() self.__init_message() except BaseException as e: logging.warning("Failed to connect to buck: %s", e) self.has_buck = False def is_connected(self) -> bool: return self.has_buck is not None def disconnect(self) -> None: local = self.event_pipe if local: local.close() def __open_pipe(self) -> None: event_path = os.path.abspath(os.environ["BUCK_EVENT_PIPE"]) # Need to go low-level for non-blocking connection. fd = os.open(event_path, os.O_WRONLY | os.O_NONBLOCK) if fd < 0: raise RuntimeError(f"Could not open pipe to {event_path}") self.event_pipe = open(fd, mode="w") # noqa(P201) def __init_message(self) -> None: local = self.event_pipe assert local local.write("j") local.write(os.linesep) local.flush() def __create_step_message(self, event: _RunningPart, status: str) -> str: message = { "event_id": event.event_id, "step_status": status, "step_type": event.name, "description": event.desc, "action_id": self.action_id, } return json.dumps(message) def __send_step(self, event: _RunningPart, status: str) -> None: message = self.__create_step_message(event, status) local = self.event_pipe if not local: return try: local.write(str(len(BuckConnection.EVENT_TYPE_STEP))) local.write(os.linesep) local.write(BuckConnection.EVENT_TYPE_STEP) local.write(str(len(message))) local.write(os.linesep) local.write(message) local.flush() except (BrokenPipeError, BlockingIOError) as e: logging.error("Buck pipe is broken! %s", e) self.has_buck = False self.event_pipe = None def start(self, name: str, desc: str) -> None: if not self.has_buck: return part = _RunningPart(self.event_id, name, desc) self.event_id += 1 self.__send_step(part, "STARTED") self.running_parts.append(part) def end(self) -> None: if not self.has_buck: return if not self.running_parts: return part = self.running_parts.pop() self.__send_step(part, "FINISHED") def size(self) -> int: return len(self.running_parts) def end_all(self, down_to: typing.Optional[int] = None) -> None: if not self.has_buck: return left = 0 if not down_to else max(0, down_to) while len(self.running_parts) > left: part = self.running_parts.pop() self.__send_step(part, "FINISHED") _BUCK_CONNECTION = BuckConnection() def get_buck_connection() -> BuckConnection: return _BUCK_CONNECTION class BuckConnectionScope: def __init__(self) -> None: self.was_connected = False self.num_parts = 0 pass def __enter__(self) -> BuckConnection: self.was_connected = _BUCK_CONNECTION.is_connected() if not self.was_connected: _BUCK_CONNECTION.connect() self.num_parts = _BUCK_CONNECTION.size() return _BUCK_CONNECTION def __exit__(self, *args: typing.Any) -> None: _BUCK_CONNECTION.end_all(self.num_parts) if not self.was_connected: _BUCK_CONNECTION.disconnect() class BuckPartScope: def __init__(self, name: str, desc: str) -> None: self.name = name self.desc = desc def __enter__(self) -> None: _BUCK_CONNECTION.start(self.name, self.desc) def __exit__(self, *args: typing.Any) -> None: _BUCK_CONNECTION.end()