def wait()

in metaflow/plugins/kubernetes/kubernetes.py [0:0]


    def wait(self, stdout_location, stderr_location, echo=None):
        def update_delay(secs_since_start):
            # this sigmoid function reaches
            # - 0.1 after 11 minutes
            # - 0.5 after 15 minutes
            # - 1.0 after 23 minutes
            # in other words, the user will see very frequent updates
            # during the first 10 minutes
            sigmoid = 1.0 / (1.0 + math.exp(-0.01 * secs_since_start + 9.0))
            return 0.5 + sigmoid * 30.0

        def wait_for_launch(job):
            status = job.status
            echo(
                "Task is starting (%s)..." % status,
                "stderr",
                job_id=job.id,
            )
            t = time.time()
            start_time = time.time()
            while job.is_waiting:
                new_status = job.status
                if status != new_status or (time.time() - t) > 30:
                    status = new_status
                    echo(
                        "Task is starting (%s)..." % status,
                        "stderr",
                        job_id=job.id,
                    )
                    t = time.time()
                time.sleep(update_delay(time.time() - start_time))

        prefix = lambda: b"[%s] " % util.to_bytes(self._job.id)

        stdout_tail = get_log_tailer(stdout_location, self._datastore.TYPE)
        stderr_tail = get_log_tailer(stderr_location, self._datastore.TYPE)

        # 1) Loop until the job has started
        wait_for_launch(self._job)

        # 2) Tail logs until the job has finished
        self._output_final_logs = False

        def _has_updates():
            if self._job.is_running:
                return True
            # Make sure to output final tail for a job that has finished.
            if not self._output_final_logs:
                self._output_final_logs = True
                return True
            return False

        tail_logs(
            prefix=prefix(),
            stdout_tail=stdout_tail,
            stderr_tail=stderr_tail,
            echo=echo,
            has_log_updates=_has_updates,
        )
        # 3) Fetch remaining logs
        #
        # It is possible that we exit the loop above before all logs have been
        # shown.
        #
        # TODO : If we notice Kubernetes failing to upload logs to S3,
        #        we can add a HEAD request here to ensure that the file
        #        exists prior to calling S3Tail and note the user about
        #        truncated logs if it doesn't.
        # TODO : For hard crashes, we can fetch logs from the pod.
        if self._job.has_failed:
            exit_code, reason = self._job.reason
            msg = next(
                msg
                for msg in [
                    reason,
                    "Task crashed",
                ]
                if msg is not None
            )
            if exit_code:
                if int(exit_code) == 139:
                    raise KubernetesException("Task failed with a segmentation fault.")
                if int(exit_code) == 137:
                    raise KubernetesException(
                        "Task ran out of memory. "
                        "Increase the available memory by specifying "
                        "@resource(memory=...) for the step. "
                    )
                if int(exit_code) == 134:
                    raise KubernetesException("%s (exit code %s)" % (msg, exit_code))
                else:
                    msg = "%s (exit code %s)" % (msg, exit_code)
            raise KubernetesException(
                "%s. This could be a transient error. Use @retry to retry." % msg
            )

        exit_code, _ = self._job.reason
        echo(
            "Task finished with exit code %s." % exit_code,
            "stderr",
            job_id=self._job.id,
        )