in chatlearn/utils/log_monitor.py [0:0]
def check_log_files_and_publish_updates(self):
"""Gets updates to the log files and publishes them.
Returns:
True if anything was published and false otherwise.
"""
anything_published = False
lines_to_publish = []
if ray.__version__ < "2.11.0":
raise ValueError("Just support ray version >= 2.11.0")
def flush():
nonlocal lines_to_publish
nonlocal anything_published
if len(lines_to_publish) > 0:
data = {
"ip": self.ip,
"pid": file_info.worker_pid,
"job": file_info.job_id,
"is_err": file_info.is_err_file,
"lines": lines_to_publish,
"actor_name": file_info.actor_name,
"task_name": file_info.task_name,
}
if ray.__version__ >= "2.38.0":
print_to_stdstream(data, ignore_prefix=False)
else:
print_to_stdstream(data)
anything_published = True
lines_to_publish = []
for file_info in self.open_file_infos:
assert not file_info.file_handle.closed
file_info.reopen_if_necessary()
max_num_lines_to_read = ray_constants.LOG_MONITOR_NUM_LINES_TO_READ
for _ in range(max_num_lines_to_read):
try:
next_line = file_info.file_handle.readline()
# Replace any characters not in UTF-8 with
# a replacement character, see
# https://stackoverflow.com/a/38565489/10891801
next_line = next_line.decode("utf-8", "replace")
if next_line == "":
break
next_line = next_line.rstrip("\r\n")
if next_line.startswith(ray_constants.LOG_PREFIX_ACTOR_NAME):
flush() # Possible change of task/actor name.
file_info.actor_name = next_line.split(
ray_constants.LOG_PREFIX_ACTOR_NAME, 1
)[1]
file_info.task_name = None
elif next_line.startswith(ray_constants.LOG_PREFIX_TASK_NAME):
flush() # Possible change of task/actor name.
file_info.task_name = next_line.split(
ray_constants.LOG_PREFIX_TASK_NAME, 1
)[1]
elif next_line.startswith(ray_constants.LOG_PREFIX_JOB_ID):
file_info.job_id = next_line.split(
ray_constants.LOG_PREFIX_JOB_ID, 1
)[1]
elif next_line.startswith(
"Windows fatal exception: access violation"
):
# We are suppressing the
# 'Windows fatal exception: access violation'
# message on workers on Windows here.
# As far as we know it is harmless,
# but is frequently popping up if Python
# functions are run inside the core
# worker C extension. See the investigation in
# github.com/ray-project/ray/issues/18944
# Also skip the following line, which is an
# empty line.
file_info.file_handle.readline()
else:
lines_to_publish.append(next_line)
except Exception:
logger.error(
f"Error: Reading file: {file_info.filename}, "
f"position: {file_info.file_handle.tell()} "
"failed."
)
raise
if file_info.file_position == 0:
# make filename windows-agnostic
filename = file_info.filename.replace("\\", "/")
if "/raylet" in filename:
file_info.worker_pid = "raylet"
elif "/gcs_server" in filename:
file_info.worker_pid = "gcs_server"
elif "/monitor" in filename or "event_AUTOSCALER" in filename:
file_info.worker_pid = "autoscaler"
elif "/runtime_env" in filename:
file_info.worker_pid = "runtime_env"
# Record the current position in the file.
file_info.file_position = file_info.file_handle.tell()
flush()
return anything_published