in chatlearn/utils/log_monitor.py [0:0]
def update_log_filenames(self):
"""Update the list of log files to monitor."""
monitor_log_paths = []
# avoid the error raised by ray:
# The core worker has already been shutdown.
cluster_state, msg = get_ray_status()
if cluster_state:
# unknown msg
if msg is not None:
return
try:
# TODO: try to reduce this frequency
logs = ray.get(self.log_actor.list_logs.remote(self.node_id))
global LOG_NAME_UPDATE_INTERVAL_S
if not self.has_log_file_list_changed(logs):
LOG_NAME_UPDATE_INTERVAL_S = min(LOG_NAME_UPDATE_INTERVAL_S * 2, MAX_LOG_NAME_UPDATE_INTERVAL_S)
else:
LOG_NAME_UPDATE_INTERVAL_S = max(LOG_NAME_UPDATE_INTERVAL_S / 2, MIN_LOG_NAME_UPDATE_INTERVAL_S)
self.log_files = logs
except (ray.exceptions.RayActorError, AttributeError):
# AttributeError: 'NoneType' object has no attribute 'address_info'
logger.info("log_actor exit, quit the LogMonitor")
self.need_quit = True
return
else:
self.need_quit = True
return
for key in ['worker_out', 'worker_err']:
monitor_log_paths += [f"{self.logs_dir}/{fn}" for fn in logs.get(key, [])]
for key in ['gcs_server', 'raylet']:
# record only err
for fn in logs.get(key, []):
if fn.endswith('.err'):
monitor_log_paths.append(f"{self.logs_dir}/{fn}")
for file_path in monitor_log_paths:
if os.path.isfile(file_path) and file_path not in self.log_filenames:
worker_match = WORKER_LOG_PATTERN.match(file_path)
if worker_match:
worker_pid = int(worker_match.group(2))
else:
worker_pid = None
job_id = None
# Perform existence check first because most file will not be
# including runtime_env. This saves some cpu cycle.
if "runtime_env" in file_path:
runtime_env_job_match = RUNTIME_ENV_SETUP_PATTERN.match(file_path)
if runtime_env_job_match:
job_id = runtime_env_job_match.group(1)
is_err_file = file_path.endswith("err")
self.log_filenames.add(file_path)
self.closed_file_infos.append(
LogFileInfo(
filename=file_path,
size_when_last_opened=0,
file_position=0,
file_handle=None,
is_err_file=is_err_file,
job_id=job_id,
worker_pid=worker_pid,
)
)