chatlearn/utils/log_monitor.py (436 lines of code) (raw):
# Copyright 2024 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""This file is modified based on ray/_private/log_monitor.py."""
import argparse
import errno
import logging
import logging.handlers
import os
import platform
import re
import shutil
import time
import traceback
from typing import Callable, List, Set
import ray
import ray.util.state
from ray._private import ray_constants
from ray._private import services
import ray._private.utils
from ray._private.ray_logging import setup_component_logger
from ray._private.worker import print_to_stdstream
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from chatlearn.utils.utils import get_ray_status
# pylint: disable=unexpected-keyword-arg
# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
# entry/init points.
logger = logging.getLogger(__name__)
# The groups are job id, and pid.
WORKER_LOG_PATTERN = re.compile(r".*worker.*-([0-9a-f]+)-(\d+)")
# The groups are job id.
RUNTIME_ENV_SETUP_PATTERN = re.compile(r".*runtime_env_setup-(\d+).log")
# Log name update interval under pressure.
# We need it because log name update is CPU intensive and uses 100%
# of cpu when there are many log files.
MIN_LOG_NAME_UPDATE_INTERVAL_S = 0.5
MAX_LOG_NAME_UPDATE_INTERVAL_S = 5
LOG_NAME_UPDATE_INTERVAL_S = float(os.getenv("LOG_NAME_UPDATE_INTERVAL_S", "0.5"))
# Once there are more files than this threshold,
# log monitor start giving backpressure to lower cpu usages.
RAY_LOG_MONITOR_MANY_FILES_THRESHOLD = int(
os.getenv("RAY_LOG_MONITOR_MANY_FILES_THRESHOLD", "1000")
)
RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED = int(
os.getenv("RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED", "0")
)
@ray.remote
class LogActor:
"""LogActor"""
def list_logs(self, node_id, retry=3):
logs = {}
msg = None
for _ in range(retry):
try:
return ray.util.state.list_logs(node_id=node_id)
except Exception as e: # pylint: disable=redefined-outer-name
msg = e
time.sleep(1)
print(f"fail to list_logs, error msg: {msg}, retry ...", flush=True)
return logs
class LogFileInfo:
"""LogFileInfo"""
def __init__(
self,
filename=None,
size_when_last_opened=None,
file_position=None,
file_handle=None,
is_err_file=False,
job_id=None,
worker_pid=None,
):
assert (
filename is not None
and size_when_last_opened is not None
and file_position is not None
)
self.filename = filename
self.size_when_last_opened = size_when_last_opened
self.file_position = file_position
self.file_handle = file_handle
self.is_err_file = is_err_file
self.job_id = job_id
self.worker_pid = worker_pid
self.actor_name = None
self.task_name = None
def reopen_if_necessary(self):
"""Check if the file's inode has changed and reopen it if necessary.
There are a variety of reasons what we would logically consider a file
would have different inodes, such as log rotation or file syncing
semantics.
"""
try:
open_inode = None
if self.file_handle and not self.file_handle.closed:
open_inode = os.fstat(self.file_handle.fileno()).st_ino
new_inode = os.stat(self.filename).st_ino
if open_inode != new_inode:
self.file_handle = open(self.filename, "rb") # pylint: disable=consider-using-with
self.file_handle.seek(self.file_position)
except Exception:
logger.debug(f"file no longer exists, skip re-opening of {self.filename}")
def __repr__(self):
return (
"FileInfo(\n"
f"\tfilename: {self.filename}\n"
f"\tsize_when_last_opened: {self.size_when_last_opened}\n"
f"\tfile_position: {self.file_position}\n"
f"\tfile_handle: {self.file_handle}\n"
f"\tis_err_file: {self.is_err_file}\n"
f"\tjob_id: {self.job_id}\n"
f"\tworker_pid: {self.worker_pid}\n"
f"\tactor_name: {self.actor_name}\n"
f"\ttask_name: {self.task_name}\n"
")"
)
class LogMonitor:
"""A monitor process for monitoring Ray log files.
This class maintains a list of open files and a list of closed log files. We
can't simply leave all files open because we'll run out of file
descriptors.
The "run" method of this class will cycle between doing several things:
1. First, it will check if any new files have appeared in the log
directory. If so, they will be added to the list of closed files.
2. Then, if we are unable to open any new files, we will close all of the
files.
3. Then, we will open as many closed files as we can that may have new
lines (judged by an increase in file size since the last time the file
was opened).
4. Then we will loop through the open files and see if there are any new
lines in the file. If so, we will publish them to Ray pubsub.
Attributes:
ip: The hostname of this machine, for grouping log messages.
logs_dir: The directory that the log files are in.
log_filenames: This is the set of filenames of all files in
open_file_infos and closed_file_infos.
open_file_infos (list[LogFileInfo]): Info for all of the open files.
closed_file_infos (list[LogFileInfo]): Info for all of the closed
files.
can_open_more_files: True if we can still open more files and
false otherwise.
max_files_open: The maximum number of files that can be open.
"""
def __init__(
self,
logs_dir,
is_proc_alive_fn: Callable[[int], bool],
log_actor=None,
max_files_open: int = ray_constants.LOG_MONITOR_MAX_OPEN_FILES,
):
"""Initialize the log monitor object."""
ray_context = ray.get_runtime_context()
self.node_id = ray_context.get_node_id()
self.ip: str = services.get_node_ip_address()
self.logs_dir: str = logs_dir
self.log_filenames: Set[str] = set()
self.open_file_infos: List[LogFileInfo] = []
self.closed_file_infos: List[LogFileInfo] = []
self.can_open_more_files: bool = True
self.max_files_open: int = max_files_open
self.is_proc_alive_fn: Callable[[int], bool] = is_proc_alive_fn
self.log_actor = log_actor
self.need_quit = False
self.log_files = []
logger.info(
f"Starting log monitor with [max open files={max_files_open}]."
)
def _close_all_files(self):
"""Close all open files (so that we can open more)."""
while len(self.open_file_infos) > 0:
file_info = self.open_file_infos.pop(0)
file_info.file_handle.close()
file_info.file_handle = None
proc_alive = True
# Test if the worker process that generated the log file
# is still alive. Only applies to worker processes.
# For all other system components, we always assume they are alive.
if (
file_info.worker_pid != "raylet"
and file_info.worker_pid != "gcs_server"
and file_info.worker_pid != "autoscaler"
and file_info.worker_pid != "runtime_env"
and file_info.worker_pid is not None
):
assert not isinstance(file_info.worker_pid, str), (
"PID should be an int type. " f"Given PID: {file_info.worker_pid}."
)
proc_alive = self.is_proc_alive_fn(file_info.worker_pid)
if not proc_alive:
# The process is not alive any more, so move the log file
# out of the log directory so glob.glob will not be slowed
# by it.
target = os.path.join(
self.logs_dir, "old", os.path.basename(file_info.filename)
)
try:
shutil.move(file_info.filename, target)
except (IOError, OSError) as e: # pylint: disable=redefined-outer-name
if e.errno == errno.ENOENT:
logger.warning(
f"Warning: The file {file_info.filename} was not found."
)
else:
raise e
if proc_alive:
self.closed_file_infos.append(file_info)
self.can_open_more_files = True
def has_log_file_list_changed(self, logs):
if len(self.log_files) != len(logs):
return True
for log in logs:
if log not in self.log_files:
return True
return False
def update_log_filenames(self):
"""Update the list of log files to monitor."""
monitor_log_paths = []
# avoid the error raised by ray:
# The core worker has already been shutdown.
cluster_state, msg = get_ray_status()
if cluster_state:
# unknown msg
if msg is not None:
return
try:
# TODO: try to reduce this frequency
logs = ray.get(self.log_actor.list_logs.remote(self.node_id))
global LOG_NAME_UPDATE_INTERVAL_S
if not self.has_log_file_list_changed(logs):
LOG_NAME_UPDATE_INTERVAL_S = min(LOG_NAME_UPDATE_INTERVAL_S * 2, MAX_LOG_NAME_UPDATE_INTERVAL_S)
else:
LOG_NAME_UPDATE_INTERVAL_S = max(LOG_NAME_UPDATE_INTERVAL_S / 2, MIN_LOG_NAME_UPDATE_INTERVAL_S)
self.log_files = logs
except (ray.exceptions.RayActorError, AttributeError):
# AttributeError: 'NoneType' object has no attribute 'address_info'
logger.info("log_actor exit, quit the LogMonitor")
self.need_quit = True
return
else:
self.need_quit = True
return
for key in ['worker_out', 'worker_err']:
monitor_log_paths += [f"{self.logs_dir}/{fn}" for fn in logs.get(key, [])]
for key in ['gcs_server', 'raylet']:
# record only err
for fn in logs.get(key, []):
if fn.endswith('.err'):
monitor_log_paths.append(f"{self.logs_dir}/{fn}")
for file_path in monitor_log_paths:
if os.path.isfile(file_path) and file_path not in self.log_filenames:
worker_match = WORKER_LOG_PATTERN.match(file_path)
if worker_match:
worker_pid = int(worker_match.group(2))
else:
worker_pid = None
job_id = None
# Perform existence check first because most file will not be
# including runtime_env. This saves some cpu cycle.
if "runtime_env" in file_path:
runtime_env_job_match = RUNTIME_ENV_SETUP_PATTERN.match(file_path)
if runtime_env_job_match:
job_id = runtime_env_job_match.group(1)
is_err_file = file_path.endswith("err")
self.log_filenames.add(file_path)
self.closed_file_infos.append(
LogFileInfo(
filename=file_path,
size_when_last_opened=0,
file_position=0,
file_handle=None,
is_err_file=is_err_file,
job_id=job_id,
worker_pid=worker_pid,
)
)
def open_closed_files(self):
"""Open some closed files if they may have new lines.
Opening more files may require us to close some of the already open
files.
"""
if not self.can_open_more_files:
# If we can't open any more files. Close all of the files.
self._close_all_files()
files_with_no_updates = []
while len(self.closed_file_infos) > 0:
if len(self.open_file_infos) >= self.max_files_open:
self.can_open_more_files = False
break
file_info = self.closed_file_infos.pop(0)
assert file_info.file_handle is None
# Get the file size to see if it has gotten bigger since we last
# opened it.
try:
file_size = os.path.getsize(file_info.filename)
except (IOError, OSError) as e: # pylint: disable=redefined-outer-name
# Catch "file not found" errors.
if e.errno == errno.ENOENT:
logger.warning(
f"Warning: The file {file_info.filename} was not found."
)
self.log_filenames.remove(file_info.filename)
continue
raise e
# If some new lines have been added to this file, try to reopen the
# file.
if file_size > file_info.size_when_last_opened:
try:
f = open(file_info.filename, "rb") # pylint: disable=consider-using-with
except (IOError, OSError) as e:
if e.errno == errno.ENOENT:
logger.warning(
f"Warning: The file {file_info.filename} was not found."
)
self.log_filenames.remove(file_info.filename)
continue
raise e
f.seek(file_info.file_position)
file_info.size_when_last_opened = file_size
file_info.file_handle = f
self.open_file_infos.append(file_info)
else:
files_with_no_updates.append(file_info)
if len(self.open_file_infos) >= self.max_files_open:
self.can_open_more_files = False
# Add the files with no changes back to the list of closed files.
self.closed_file_infos += files_with_no_updates
def check_log_files_and_publish_updates(self):
"""Gets updates to the log files and publishes them.
Returns:
True if anything was published and false otherwise.
"""
anything_published = False
lines_to_publish = []
if ray.__version__ < "2.11.0":
raise ValueError("Just support ray version >= 2.11.0")
def flush():
nonlocal lines_to_publish
nonlocal anything_published
if len(lines_to_publish) > 0:
data = {
"ip": self.ip,
"pid": file_info.worker_pid,
"job": file_info.job_id,
"is_err": file_info.is_err_file,
"lines": lines_to_publish,
"actor_name": file_info.actor_name,
"task_name": file_info.task_name,
}
if ray.__version__ >= "2.38.0":
print_to_stdstream(data, ignore_prefix=False)
else:
print_to_stdstream(data)
anything_published = True
lines_to_publish = []
for file_info in self.open_file_infos:
assert not file_info.file_handle.closed
file_info.reopen_if_necessary()
max_num_lines_to_read = ray_constants.LOG_MONITOR_NUM_LINES_TO_READ
for _ in range(max_num_lines_to_read):
try:
next_line = file_info.file_handle.readline()
# Replace any characters not in UTF-8 with
# a replacement character, see
# https://stackoverflow.com/a/38565489/10891801
next_line = next_line.decode("utf-8", "replace")
if next_line == "":
break
next_line = next_line.rstrip("\r\n")
if next_line.startswith(ray_constants.LOG_PREFIX_ACTOR_NAME):
flush() # Possible change of task/actor name.
file_info.actor_name = next_line.split(
ray_constants.LOG_PREFIX_ACTOR_NAME, 1
)[1]
file_info.task_name = None
elif next_line.startswith(ray_constants.LOG_PREFIX_TASK_NAME):
flush() # Possible change of task/actor name.
file_info.task_name = next_line.split(
ray_constants.LOG_PREFIX_TASK_NAME, 1
)[1]
elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID):
file_info.job_id = next_line.split(
ray_constants.LOG_PREFIX_JOB_ID, 1
)[1]
elif next_line.startswith(
"Windows fatal exception: access violation"
):
# We are suppressing the
# 'Windows fatal exception: access violation'
# message on workers on Windows here.
# As far as we know it is harmless,
# but is frequently popping up if Python
# functions are run inside the core
# worker C extension. See the investigation in
# github.com/ray-project/ray/issues/18944
# Also skip the following line, which is an
# empty line.
file_info.file_handle.readline()
else:
lines_to_publish.append(next_line)
except Exception:
logger.error(
f"Error: Reading file: {file_info.filename}, "
f"position: {file_info.file_handle.tell()} "
"failed."
)
raise
if file_info.file_position == 0:
# make filename windows-agnostic
filename = file_info.filename.replace("\\", "/")
if "/raylet" in filename:
file_info.worker_pid = "raylet"
elif "/gcs_server" in filename:
file_info.worker_pid = "gcs_server"
elif "/monitor" in filename or "event_AUTOSCALER" in filename:
file_info.worker_pid = "autoscaler"
elif "/runtime_env" in filename:
file_info.worker_pid = "runtime_env"
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()
flush()
return anything_published
def should_update_filenames(self, last_file_updated_time: float) -> bool:
"""Return true if filenames should be updated.
This method is used to apply the backpressure on file updates because
that requires heavy glob operations which use lots of CPUs.
Args:
last_file_updated_time: The last time filenames are updated.
Returns:
True if filenames should be updated. False otherwise.
"""
elapsed_seconds = float(time.time() - last_file_updated_time)
return (
len(self.log_filenames) < RAY_LOG_MONITOR_MANY_FILES_THRESHOLD
or elapsed_seconds > LOG_NAME_UPDATE_INTERVAL_S
)
def run(self, quit_event=None):
"""Run the log monitor.
This will scan the file system once every LOG_NAME_UPDATE_INTERVAL_S to
check if there are new log files to monitor. It will also publish new
log lines.
"""
last_updated = time.time()
while not self.need_quit:
if self.should_update_filenames(last_updated):
self.update_log_filenames()
last_updated = time.time()
self.open_closed_files()
anything_published = self.check_log_files_and_publish_updates()
# If nothing was published, then wait a little bit before checking
# for logs to avoid using too much CPU.
if not anything_published:
time.sleep(0.1)
if quit_event is not None and quit_event.is_set():
logger.info("============= quit_event is set, quit the LogMonitor =============")
break
def is_proc_alive(pid):
# Import locally to make sure the bundled version is used if needed
import psutil # pylint: disable=import-outside-toplevel
try:
return psutil.Process(pid).is_running()
except psutil.NoSuchProcess:
# The process does not exist.
return False
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=("Parse GCS server address for the log monitor to connect to.")
)
parser.add_argument(
"--logging-level",
required=False,
type=str,
default=ray_constants.LOGGER_LEVEL,
choices=ray_constants.LOGGER_LEVEL_CHOICES,
help=ray_constants.LOGGER_LEVEL_HELP,
)
parser.add_argument(
"--logging-format",
required=False,
type=str,
default=ray_constants.LOGGER_FORMAT,
help=ray_constants.LOGGER_FORMAT_HELP,
)
parser.add_argument(
"--logging-filename",
required=False,
type=str,
default=ray_constants.LOG_MONITOR_LOG_FILE_NAME,
help="Specify the name of log file, "
"log to stdout if set empty, default is "
f'"{ray_constants.LOG_MONITOR_LOG_FILE_NAME}"',
)
parser.add_argument(
"--logs-dir",
required=True,
type=str,
help="Specify the path of the temporary directory used by Ray processes.",
)
parser.add_argument(
"--logging-rotate-bytes",
required=False,
type=int,
default=ray_constants.LOGGING_ROTATE_BYTES,
help="Specify the max bytes for rotating "
"log file, default is "
f"{ray_constants.LOGGING_ROTATE_BYTES} bytes.",
)
parser.add_argument(
"--logging-rotate-backup-count",
required=False,
type=int,
default=ray_constants.LOGGING_ROTATE_BACKUP_COUNT,
help="Specify the backup count of rotated log file, default is "
f"{ray_constants.LOGGING_ROTATE_BACKUP_COUNT}.",
)
args = parser.parse_args()
setup_component_logger(
logging_level=args.logging_level,
logging_format=args.logging_format,
log_dir=args.logs_dir,
filename=args.logging_filename,
max_bytes=args.logging_rotate_bytes,
backup_count=args.logging_rotate_backup_count,
)
ray.init()
_node_id = [node for node in ray.nodes() if 'master' in node['NodeManagerHostname']][0]['NodeID']
actor = LogActor.options(
name='_LOG_ACTOR_NAME',
scheduling_strategy=NodeAffinitySchedulingStrategy(
node_id=_node_id,
soft = False,
)
).remote()
log_monitor = LogMonitor(
args.logs_dir,
is_proc_alive,
actor
)
try:
log_monitor.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
traceback_str = ray._private.utils.format_error_message(traceback.format_exc())
message = (
f"The log monitor on node {platform.node()} "
f"failed with the following error:\n{traceback_str}"
)
logger.error(message)
raise e