def logs_for_processing_job()

in src/sagemaker/session.py [0:0]


    def logs_for_processing_job(self, job_name, wait=False, poll=10):
        """Display logs for a given processing job, optionally tailing them until the is complete.

        Args:
            job_name (str): Name of the processing job to display the logs for.
            wait (bool): Whether to keep looking for new log entries until the job completes
                (default: False).
            poll (int): The interval in seconds between polling for new log entries and job
                completion (default: 5).

        Raises:
            ValueError: If the processing job fails.
        """

        description = self.sagemaker_client.describe_processing_job(ProcessingJobName=job_name)

        instance_count, stream_names, positions, client, log_group, dot, color_wrap = _logs_init(
            self, description, job="Processing"
        )

        state = _get_initial_job_state(description, "ProcessingJobStatus", wait)

        # The loop below implements a state machine that alternates between checking the job status
        # and reading whatever is available in the logs at this point. Note, that if we were
        # called with wait == False, we never check the job status.
        #
        # If wait == TRUE and job is not completed, the initial state is TAILING
        # If wait == FALSE, the initial state is COMPLETE (doesn't matter if the job really is
        # complete).
        #
        # The state table:
        #
        # STATE               ACTIONS                        CONDITION             NEW STATE
        # ----------------    ----------------               -----------------     ----------------
        # TAILING             Read logs, Pause, Get status   Job complete          JOB_COMPLETE
        #                                                    Else                  TAILING
        # JOB_COMPLETE        Read logs, Pause               Any                   COMPLETE
        # COMPLETE            Read logs, Exit                                      N/A
        #
        # Notes:
        # - The JOB_COMPLETE state forces us to do an extra pause and read any items that got to
        #   Cloudwatch after the job was marked complete.
        last_describe_job_call = time.time()
        while True:
            _flush_log_streams(
                stream_names,
                instance_count,
                client,
                log_group,
                job_name,
                positions,
                dot,
                color_wrap,
            )
            if state == LogState.COMPLETE:
                break

            time.sleep(poll)

            if state == LogState.JOB_COMPLETE:
                state = LogState.COMPLETE
            elif time.time() - last_describe_job_call >= 30:
                description = self.sagemaker_client.describe_processing_job(
                    ProcessingJobName=job_name
                )
                last_describe_job_call = time.time()

                status = description["ProcessingJobStatus"]

                if status in ("Completed", "Failed", "Stopped"):
                    print()
                    state = LogState.JOB_COMPLETE

        if wait:
            self._check_job_status(job_name, description, "ProcessingJobStatus")
            if dot:
                print()