in src/braket/aws/aws_quantum_job.py [0:0]
def logs(self, wait: bool = False, poll_interval_seconds: int = 5) -> None:
"""Display logs for a given job, optionally tailing them until job is complete.
If the output is a tty or a Jupyter cell, it will be color-coded
based on which instance the log entry is from.
Args:
wait (bool): `True` to keep looking for new log entries until the job completes;
otherwise `False`. Default: `False`.
poll_interval_seconds (int): The interval of time, in seconds, between polling for
new log entries and job completion (default: 5).
Raises:
exceptions.UnexpectedStatusException: If waiting and the training job fails.
"""
# The loop below implements a state machine that alternates between checking the job status
# and reading whatever is available in the logs at this point. Note, that if we were
# called with wait == False, we never check the job status.
#
# If wait == TRUE and job is not completed, the initial state is TAILING
# If wait == FALSE, the initial state is COMPLETE (doesn't matter if the job really is
# complete).
#
# The state table:
#
# STATE ACTIONS CONDITION NEW STATE
# ---------------- ---------------- ----------------- ----------------
# TAILING Read logs, Pause, Get status Job complete JOB_COMPLETE
# Else TAILING
# JOB_COMPLETE Read logs, Pause Any COMPLETE
# COMPLETE Read logs, Exit N/A
#
# Notes:
# - The JOB_COMPLETE state forces us to do an extra pause and read any items that got to
# Cloudwatch after the job was marked complete.
job_already_completed = self.state() in AwsQuantumJob.TERMINAL_STATES
log_state = (
AwsQuantumJob.LogState.TAILING
if wait and not job_already_completed
else AwsQuantumJob.LogState.COMPLETE
)
log_group = AwsQuantumJob.LOG_GROUP
stream_prefix = f"{self.name}/"
stream_names = [] # The list of log streams
positions = {} # The current position in each stream, map of stream name -> position
instance_count = 1 # currently only support a single instance
has_streams = False
color_wrap = logs.ColorWrap()
while True:
time.sleep(poll_interval_seconds)
has_streams = logs.flush_log_streams(
self._aws_session,
log_group,
stream_prefix,
stream_names,
positions,
instance_count,
has_streams,
color_wrap,
)
if log_state == AwsQuantumJob.LogState.COMPLETE:
break
if log_state == AwsQuantumJob.LogState.JOB_COMPLETE:
log_state = AwsQuantumJob.LogState.COMPLETE
elif self.state() in AwsQuantumJob.TERMINAL_STATES:
log_state = AwsQuantumJob.LogState.JOB_COMPLETE