in src/sagemaker/session.py [0:0]
def logs_for_processing_job(self, job_name, wait=False, poll=10):
"""Display logs for a given processing job, optionally tailing them until the is complete.
Args:
job_name (str): Name of the processing job to display the logs for.
wait (bool): Whether to keep looking for new log entries until the job completes
(default: False).
poll (int): The interval in seconds between polling for new log entries and job
completion (default: 5).
Raises:
ValueError: If the processing job fails.
"""
description = self.sagemaker_client.describe_processing_job(ProcessingJobName=job_name)
instance_count, stream_names, positions, client, log_group, dot, color_wrap = _logs_init(
self, description, job="Processing"
)
state = _get_initial_job_state(description, "ProcessingJobStatus", wait)
# The loop below implements a state machine that alternates between checking the job status
# and reading whatever is available in the logs at this point. Note, that if we were
# called with wait == False, we never check the job status.
#
# If wait == TRUE and job is not completed, the initial state is TAILING
# If wait == FALSE, the initial state is COMPLETE (doesn't matter if the job really is
# complete).
#
# The state table:
#
# STATE ACTIONS CONDITION NEW STATE
# ---------------- ---------------- ----------------- ----------------
# TAILING Read logs, Pause, Get status Job complete JOB_COMPLETE
# Else TAILING
# JOB_COMPLETE Read logs, Pause Any COMPLETE
# COMPLETE Read logs, Exit N/A
#
# Notes:
# - The JOB_COMPLETE state forces us to do an extra pause and read any items that got to
# Cloudwatch after the job was marked complete.
last_describe_job_call = time.time()
while True:
_flush_log_streams(
stream_names,
instance_count,
client,
log_group,
job_name,
positions,
dot,
color_wrap,
)
if state == LogState.COMPLETE:
break
time.sleep(poll)
if state == LogState.JOB_COMPLETE:
state = LogState.COMPLETE
elif time.time() - last_describe_job_call >= 30:
description = self.sagemaker_client.describe_processing_job(
ProcessingJobName=job_name
)
last_describe_job_call = time.time()
status = description["ProcessingJobStatus"]
if status in ("Completed", "Failed", "Stopped"):
print()
state = LogState.JOB_COMPLETE
if wait:
self._check_job_status(job_name, description, "ProcessingJobStatus")
if dot:
print()