odps/models/tasks/core.py (123 lines of code) (raw):

# Copyright 1999-2024 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import importlib import itertools import json import textwrap from collections import OrderedDict from ... import errors, serializers from ...compat import six from ..core import AbstractXMLRemoteModel _type_to_task_classes = dict() class Task(AbstractXMLRemoteModel): __slots__ = ("name", "comment", "properties") _type_indicator = "type" name = serializers.XMLNodeField("Name") type = serializers.XMLTagField(".") comment = serializers.XMLNodeField("Comment") properties = serializers.XMLNodePropertiesField( "Config", "Property", key_tag="Name", value_tag="Value" ) @classmethod def _load_task_classes(cls): if _type_to_task_classes: return mod = importlib.import_module("odps.models.tasks") for v in six.itervalues(mod.__dict__): if not isinstance(v, type) or not issubclass(v, Task) or v is Task: continue cls_type = getattr(v, "_root", v.__name__) _type_to_task_classes[cls_type] = v def __new__(cls, *args, **kwargs): typo = kwargs.get("type") if typo is not None: cls._load_task_classes() task_cls = _type_to_task_classes.get(typo, cls) else: task_cls = cls return object.__new__(task_cls) def set_property(self, key, value): if self.properties is None: self.properties = OrderedDict() self.properties[key] = value def _update_property_json(self, field, value): def update(kv, dest): if not kv: return for k, v in six.iteritems(kv): if isinstance(v, bool): dest[k] = "true" if v else "false" else: dest[k] = str(v) if self.properties is None: self.properties = OrderedDict() if field in self.properties: settings = json.loads(self.properties[field]) else: settings = OrderedDict() update(value, settings) self.properties[field] = json.dumps(settings) def update_settings(self, value): self._update_property_json("settings", value) def serialize(self): if type(self) is Task: raise errors.ODPSError("Unknown task type") return super(Task, self).serialize() @property def instance(self): return self.parent.parent @property def progress(self): """ Get progress of a task. """ return self.instance.get_task_progress(self.name) @property def stages(self): """ Get execution stages of a task. """ return self.instance.get_task_progress(self.name).stages @property def result(self): """ Get execution result of the task. """ return self.instance.get_task_result(self.name) @property def summary(self): """ Get execution summary of the task. """ return self.instance.get_task_summary(self.name) @property def detail(self): """ Get execution details of the task. """ return self.instance.get_task_detail(self.name) @property def quota(self): """ Get quota json of the task. """ return self.instance.get_task_quota(self.name) @property def workers(self): """ Get workers of the task. """ return self.instance.get_task_workers(self.name) def get_info(self, key, raise_empty=False): """ Get associated information of the task. """ return self.instance.get_task_info(self.name, key, raise_empty=raise_empty) def put_info(self, key, value, raise_empty=False): """ Put associated information of the task. """ return self.instance.put_task_info( self.name, key, value, raise_empty=raise_empty ) def format_cdata(query, semicolon=False): stripped_query = query.strip() if semicolon and not stripped_query.endswith(";"): stripped_query += ";" return "<![CDATA[%s]]>" % stripped_query def build_execute_method(func, head_docstr): ext_wrapper = None unwrap_func = func if isinstance(func, classmethod): unwrap_func = func.__func__ ext_wrapper = classmethod @six.wraps(unwrap_func) def wrapped(cls, *args, **kw): inst = unwrap_func(cls, *args, **kw) inst.wait_for_success() return inst wrapped.__name__ = unwrap_func.__name__.replace("run_", "execute_") dent_count = min( len(list(itertools.takewhile(lambda c: c == " ", line))) for line in unwrap_func.__doc__.splitlines() if line.strip() ) _, rest_doc = textwrap.dedent(unwrap_func.__doc__).split("\n\n", 1) doc = "\n" + head_docstr.strip() + "\n\n" + rest_doc wrapped.__doc__ = "\n".join( " " * dent_count + line if line else "" for line in doc.splitlines() ) if ext_wrapper is not None: wrapped = ext_wrapper(wrapped) return wrapped