odps/models/xflows.py (200 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import time from collections import OrderedDict from .. import compat, errors, options, serializers from ..compat import six from .core import Iterable, XMLRemoteModel from .instance import Instance from .xflow import XFlow logger = logging.getLogger(__name__) class XFlows(Iterable): marker = serializers.XMLNodeField("Marker") max_items = serializers.XMLNodeField("MaxItems") xflows = serializers.XMLNodesReferencesField(XFlow, "odpsalgo") def _get(self, name): return XFlow(client=self._client, parent=self, name=name) def __contains__(self, item): if isinstance(item, six.string_types): xflow = self._get(item) elif isinstance(item, XFlow): xflow = item else: return False try: xflow.reload() return True except errors.NoSuchObject: return False def __iter__(self): return self.iterate() def iterate(self, owner=None): params = dict() if owner is not None: params["owner"] = owner def _it(): last_marker = params.get("marker") if "marker" in params and (last_marker is None or len(last_marker) == 0): return url = self.resource() resp = self._client.get(url, params=params) inst = XFlows.parse(self._client, resp, obj=self) params["marker"] = inst.marker return inst.xflows while True: xflows = _it() if xflows is None: break for xflow in xflows: yield xflow def create(self, xml_source): url = self.resource() headers = {"Content-Type": "application/xml"} self._client.post(url, xml_source, headers=headers) def delete(self, name): if not isinstance(name, XFlow): xflow = XFlow(name=name, parent=self) else: xflow = name name = name.name del self[name] url = xflow.resource() self._client.delete(url) def update(self, xflow): url = xflow.resource() headers = {"Content-Type": "application/xml"} self._client.put(url, xflow.xml_source, headers) xflow.reload() return xflow class XFlowInstance(XMLRemoteModel): __slots__ = ( "xflow_project", "xflow_name", "parameters", "priority", "properties", ) _root = "XflowInstance" xflow_project = serializers.XMLNodeField("Project") xflow_name = serializers.XMLNodeField("Xflow") parameters = serializers.XMLNodePropertiesField( "Parameters", "Parameter", key_tag="Key", value_tag="Value", required=True ) priority = serializers.XMLNodeField( "Priority", parse_callback=int, serialize_callback=int ) properties = serializers.XMLNodePropertiesField( "Config", "Property", key_tag="Name", value_tag="Value" ) class AnonymousSubmitXFlowInstance(XMLRemoteModel): _root = "Instance" instance = serializers.XMLNodeReferenceField( "XFlows.XFlowInstance", "XflowInstance" ) @staticmethod def _gen_xflow_instance_xml(xflow_instance=None, **kw): if xflow_instance is None: xflow_instance = XFlows.XFlowInstance(**kw) inst = XFlows.AnonymousSubmitXFlowInstance(instance=xflow_instance) return inst.serialize() def run_xflow( self, xflow_instance=None, project=None, hints=None, parameters=None, **kw ): project = project or self.parent props = kw.get("properties") or OrderedDict() props.update(hints or {}) if options.ml.xflow_settings: props.update(options.ml.xflow_settings) if options.biz_id: props["biz_id"] = str(options.biz_id) if options.default_task_settings: settings = options.default_task_settings.copy() exist_settings = json.loads( props.get("settings") or "{}", object_pairs_hook=OrderedDict ) settings.update(exist_settings) str_settings = OrderedDict() for k, v in settings.items(): if isinstance(v, six.string_types): str_settings[k] = v elif isinstance(v, bool): str_settings[k] = "true" if v else "false" else: str_settings[k] = str(v) props["settings"] = json.dumps(str_settings) if props: kw["properties"] = props if parameters: new_params = OrderedDict() for k, v in six.iteritems(parameters): if k == "modelName" and "/" not in v: new_params[k] = "%s/offlinemodels/%s" % (project.name, v) elif k in ("inputTableName", "outputTableName") and "." not in v: new_params[k] = "%s.%s" % (project.name, v) else: new_params[k] = v parameters = new_params inst_xml = self._gen_xflow_instance_xml( xflow_instance=xflow_instance, parameters=parameters, **kw ) try: return project.instances.create(xml=inst_xml) except: logger.error("Failed to create xflow instance. Job XML:\n%s", inst_xml) raise class XFlowResult(XMLRemoteModel): class XFlowAction(XMLRemoteModel): node_type = serializers.XMLNodeAttributeField(".", attr="NodeType") instance_id = serializers.XMLNodeField("InstanceId") name = serializers.XMLNodeField("Name") result = serializers.XMLNodeReferenceField( Instance.InstanceResult, "Result" ) actions = serializers.XMLNodesReferencesField(XFlowAction, "Actions", "Action") def get_xflow_results(self, instance): url = instance.resource() resp = self._client.get(url, action="xresult") xflow_result = XFlows.XFlowResult.parse(self._client, resp) return {action.name: action for action in xflow_result.actions} def get_xflow_source(self, instance): return self._client.get(instance.resource(), action="xsource").content def get_xflow_instance(self, instance): content = self.get_xflow_source(instance) try: inst = XFlows.AnonymousSubmitXFlowInstance.parse(self._client, content) return inst.instance except compat.ElementTreeParseError as e: raise errors.ODPSError(e) def get_xflow_sub_instances(self, instance): inst_dict = OrderedDict() for x_result in filter( lambda xr: xr.node_type != "Local", six.itervalues(self.get_xflow_results(instance)), ): if x_result.node_type == "Instance": inst_dict[x_result.name] = self.parent.instances[x_result.instance_id] elif x_result.node_type == "SubWorkflow": sub_instance = self.parent.instances[x_result.instance_id] sub_inst_dict = self.get_xflow_sub_instances(sub_instance) inst_dict.update(**sub_inst_dict) return inst_dict def iter_xflow_sub_instances(self, instance, interval=1, check=False): inst_id_set = set() while not instance.is_terminated(retry=True): sub_tasks_result = self.get_xflow_sub_instances(instance) for k, v in six.iteritems(sub_tasks_result): if v.id not in inst_id_set: inst_id_set.add(v.id) yield k, v try: time.sleep(interval) except KeyboardInterrupt: break if check: instance.wait_for_success(interval=interval) def is_xflow_instance(self, instance): try: self.get_xflow_instance(instance) return True except errors.ODPSError: return False