odps/ui/progress.py (276 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 1999-2024 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import logging
import time
import uuid
from collections import OrderedDict
from ..compat import six
from ..config import options
from ..errors import InternalServerError, RequestTimeTooSkewed
from ..models.instance import Instance
from ..serializers import JSONNodeField, JSONNodesReferencesField, JSONSerializableModel
from .common import build_trait
PROGRESS_RETRY = 3
PROGRESS_RETRY_DELAY = 0.15
logger = logging.getLogger(__name__)
"""
Progress Storage
"""
PROGRESS_REPO = dict()
class _StageProgressJSON(JSONSerializableModel):
name = JSONNodeField("name")
backup_workers = JSONNodeField("backup_workers", parse_callback=int, default=0)
terminated_workers = JSONNodeField(
"terminated_workers", parse_callback=int, default=0
)
running_workers = JSONNodeField("running_workers", parse_callback=int, default=0)
total_workers = JSONNodeField("total_workers", parse_callback=int, default=0)
input_records = JSONNodeField("input_records", parse_callback=int, default=0)
output_records = JSONNodeField("output_records", parse_callback=int, default=0)
finished_percentage = JSONNodeField(
"finished_percentage", parse_callback=int, default=0
)
def __init__(self, **kwargs):
super(_StageProgressJSON, self).__init__(**kwargs)
class _TaskProgressJSON(JSONSerializableModel):
name = JSONNodeField("name")
status = JSONNodeField(
"status",
parse_callback=lambda v: Instance.Task.TaskStatus(v.upper()),
serialize_callback=lambda v: v.value,
)
stages = JSONNodesReferencesField(_StageProgressJSON, "stages")
def get_stage_progress_formatted_string(self):
buf = six.StringIO()
buf.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
buf.write(" ")
for stage in self.stages:
buf.write(
"{0}:{1}/{2}/{3}{4}[{5}%]\t".format(
stage.name,
stage.running_workers,
stage.terminated_workers,
stage.total_workers,
"(+%s backups)" % stage.backup_workers
if stage.backup_workers > 0
else "",
stage.finished_percentage,
)
)
return buf.getvalue()
class _InstanceProgressJSON(JSONSerializableModel):
id = JSONNodeField("id")
logview = JSONNodeField("logview")
status = JSONNodeField(
"status",
parse_callback=lambda v: Instance.Status(v.upper()),
serialize_callback=lambda v: v.value,
)
tasks = JSONNodeField(
"tasks",
parse_callback=lambda v: _InstanceProgressJSON._parse_tasks(v),
serialize_callback=lambda v: [d.serial() for d in six.itervalues(v)],
)
@staticmethod
def _parse_tasks(obj):
return OrderedDict([(o["name"], _TaskProgressJSON.parse(o)) for o in obj])
class _InstancesProgressJSON(JSONSerializableModel):
name = JSONNodeField("name")
key = JSONNodeField("key")
gen_time = JSONNodeField("gen_time")
logview = JSONNodeField("logview")
instances = JSONNodeField(
"instances",
parse_callback=lambda v: _InstancesProgressJSON._parse_instances(v),
serialize_callback=lambda v: [d.serial() for d in six.itervalues(v)],
)
@staticmethod
def _parse_instances(obj):
return OrderedDict([(o["id"], _InstanceProgressJSON.parse(o)) for o in obj])
def update_instance(self, inst):
self.instances[inst.id] = inst
def create_instance_group(name):
key = "%x_%s" % (int(time.time()), str(uuid.uuid4()).lower())
group_json = _InstancesProgressJSON(name=name, key=key, instances=OrderedDict())
group_json.gen_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
PROGRESS_REPO[key] = group_json
return key
_logview_cache = dict()
def _reload_instance_status(odps, group_id, instance_id):
if group_id not in PROGRESS_REPO:
raise KeyError("Instance group ID not exist.")
group_json = PROGRESS_REPO[group_id]
if instance_id in group_json.instances:
inst_json = group_json.instances[instance_id]
if inst_json.status == Instance.Status.TERMINATED:
return
else:
inst_json = _InstanceProgressJSON(id=instance_id, tasks=dict())
group_json.instances[instance_id] = inst_json
group_json.gen_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
old_status = inst_json.status
sub_inst = odps.get_instance(instance_id)
inst_json.status = sub_inst.status
if instance_id not in _logview_cache:
_logview_cache[instance_id] = sub_inst.get_logview_address()
inst_json.logview = _logview_cache[instance_id]
if old_status != Instance.Status.TERMINATED:
for task_name, task in six.iteritems(sub_inst.get_task_statuses()):
if task_name in inst_json.tasks:
task_json = inst_json.tasks[task_name]
task_json.status = task.status
if task.status not in set(
[Instance.Task.TaskStatus.RUNNING, Instance.Task.TaskStatus.WAITING]
):
continue
else:
task_json = _TaskProgressJSON(
name=task_name, status=task.status, stages=[]
)
inst_json.tasks[task_name] = task_json
task_json.stages = []
try:
task_prog = sub_inst.get_task_progress(task_name)
except Exception:
continue
for stage in task_prog.stages:
stage_json = _StageProgressJSON()
for field_name in six.iterkeys(_StageProgressJSON.__fields):
if hasattr(stage, field_name):
val = getattr(stage, field_name)
if val is not None:
setattr(stage_json, field_name, val)
task_json.stages.append(stage_json)
def reload_instance_status(odps, group_id, instance_id):
retry_num = options.retry_times
while retry_num > 0:
try:
return _reload_instance_status(odps, group_id, instance_id)
except (InternalServerError, RequestTimeTooSkewed):
retry_num -= 1
if retry_num > 0:
time.sleep(PROGRESS_RETRY_DELAY)
def fetch_instance_group(group_id):
if group_id not in PROGRESS_REPO:
raise KeyError("Instance group ID not exist.")
return PROGRESS_REPO[group_id]
def exist_instance_group(group_id):
return group_id in PROGRESS_REPO
"""
User Interface
"""
try:
from ..console import (
in_ipython_frontend,
ipython_major_version,
is_widgets_available,
widgets,
)
if ipython_major_version < 4:
from IPython.utils.traitlets import List, Unicode
else:
from traitlets import Unicode, List # noqa: F401
from IPython.display import display
except Exception:
InstancesProgress = None
else:
if widgets and in_ipython_frontend():
class InstancesProgress(widgets.DOMWidget):
_view_name = build_trait(Unicode, "InstancesProgress", sync=True)
_view_module = build_trait(Unicode, "pyodps/progress", sync=True)
prefix = build_trait(Unicode, "prefix", sync=True)
suffix = build_trait(Unicode, "suffix", sync=True)
def __init__(self, **kwargs):
"""Constructor"""
widgets.DOMWidget.__init__(self, **kwargs) # Call the base.
# Allow the user to register error callbacks with the following signatures:
self.errors = widgets.CallbackDispatcher(accepted_nargs=[0, 1])
def update(self):
self.send(json.dumps(dict(action="update", content=[])))
def update_group(self, group_jsons):
if isinstance(group_jsons, six.string_types):
group_jsons = [group_jsons]
try:
self.send(json.dumps(dict(action="update", content=group_jsons)))
except:
pass
def delete_group(self, group_keys):
if isinstance(group_keys, six.string_types):
group_keys = [group_keys]
try:
self.send(json.dumps(dict(action="delete", content=group_keys)))
except:
pass
def clear_groups(self):
try:
self.send(json.dumps(dict(action="clear")))
except:
pass
else:
InstancesProgress = None
class ProgressGroupUI(object):
def __init__(self, ipython_widget=False):
self._ipython_widget = ipython_widget
if ipython_widget and InstancesProgress is None:
raise RuntimeError("Cannot create group ui when InstancesProgress is None")
self._widget = None
self._group_keys = set()
self._prefix = ""
self._suffix = ""
@property
def prefix(self):
return self._prefix
@prefix.setter
def prefix(self, value):
self._prefix = value
self._update_text()
@property
def suffix(self):
return self._suffix
@suffix.setter
def suffix(self, value):
self._suffix = value
self._update_text()
def has_keys(self, keys):
if isinstance(keys, six.string_types):
keys = [keys]
return all(k in self._group_keys for k in keys)
def add_keys(self, keys):
if isinstance(keys, six.string_types):
keys = [keys]
self._group_keys.update(keys)
self._update_group(keys)
def remove_keys(self, keys):
if isinstance(keys, six.string_types):
keys = [keys]
self._group_keys -= set(keys)
self._widget.delete_group(keys)
def clear_keys(self):
self._group_keys = set()
self._widget.clear_groups()
def _update_text(self):
if self._ipython_widget:
if not self._widget:
self._widget = InstancesProgress()
if is_widgets_available():
display(self._widget)
self._widget.prefix = self._prefix
self._widget.suffix = self._suffix
self._widget.update()
def _update_group(self, keys):
if self._ipython_widget:
if not self._widget:
self._widget = InstancesProgress()
if is_widgets_available():
display(self._widget)
if isinstance(keys, six.string_types):
keys = [keys]
data = [
fetch_instance_group(key).serialize()
for key in keys
if exist_instance_group(key)
]
self._widget.update_group(data)
def update(self):
self._update_text()
data = [
fetch_instance_group(key).serialize()
for key in self._group_keys
if exist_instance_group(key)
]
self._widget.update_group(data)
def close(self):
if self._ipython_widget and self._widget:
self._widget.close()