in metaflow/plugins/kubernetes/kubernetes.py [0:0]
def wait(self, stdout_location, stderr_location, echo=None):
def update_delay(secs_since_start):
# this sigmoid function reaches
# - 0.1 after 11 minutes
# - 0.5 after 15 minutes
# - 1.0 after 23 minutes
# in other words, the user will see very frequent updates
# during the first 10 minutes
sigmoid = 1.0 / (1.0 + math.exp(-0.01 * secs_since_start + 9.0))
return 0.5 + sigmoid * 30.0
def wait_for_launch(job):
status = job.status
echo(
"Task is starting (%s)..." % status,
"stderr",
job_id=job.id,
)
t = time.time()
start_time = time.time()
while job.is_waiting:
new_status = job.status
if status != new_status or (time.time() - t) > 30:
status = new_status
echo(
"Task is starting (%s)..." % status,
"stderr",
job_id=job.id,
)
t = time.time()
time.sleep(update_delay(time.time() - start_time))
prefix = lambda: b"[%s] " % util.to_bytes(self._job.id)
stdout_tail = get_log_tailer(stdout_location, self._datastore.TYPE)
stderr_tail = get_log_tailer(stderr_location, self._datastore.TYPE)
# 1) Loop until the job has started
wait_for_launch(self._job)
# 2) Tail logs until the job has finished
self._output_final_logs = False
def _has_updates():
if self._job.is_running:
return True
# Make sure to output final tail for a job that has finished.
if not self._output_final_logs:
self._output_final_logs = True
return True
return False
tail_logs(
prefix=prefix(),
stdout_tail=stdout_tail,
stderr_tail=stderr_tail,
echo=echo,
has_log_updates=_has_updates,
)
# 3) Fetch remaining logs
#
# It is possible that we exit the loop above before all logs have been
# shown.
#
# TODO : If we notice Kubernetes failing to upload logs to S3,
# we can add a HEAD request here to ensure that the file
# exists prior to calling S3Tail and note the user about
# truncated logs if it doesn't.
# TODO : For hard crashes, we can fetch logs from the pod.
if self._job.has_failed:
exit_code, reason = self._job.reason
msg = next(
msg
for msg in [
reason,
"Task crashed",
]
if msg is not None
)
if exit_code:
if int(exit_code) == 139:
raise KubernetesException("Task failed with a segmentation fault.")
if int(exit_code) == 137:
raise KubernetesException(
"Task ran out of memory. "
"Increase the available memory by specifying "
"@resource(memory=...) for the step. "
)
if int(exit_code) == 134:
raise KubernetesException("%s (exit code %s)" % (msg, exit_code))
else:
msg = "%s (exit code %s)" % (msg, exit_code)
raise KubernetesException(
"%s. This could be a transient error. Use @retry to retry." % msg
)
exit_code, _ = self._job.reason
echo(
"Task finished with exit code %s." % exit_code,
"stderr",
job_id=self._job.id,
)