odps/models/tasks/maxframe.py (50 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json from collections import OrderedDict from ... import serializers from ...compat import enum, six from ...config import options from .core import Task class MaxFrameTask(Task): __slots__ = ("_output_format", "_major_version", "_service_endpoint") _root = "MaxFrame" _anonymous_task_name = "AnonymousMaxFrameTask" class CommandType(enum.Enum): CREATE_SESSION = "CREATE_SESSION" PYTHON_PACK = "PYTHON_PACK" RAY_CLUSTER_INIT = "RAY_CLUSTER_INIT" RAY_CLUSTER_FREE = "RAY_CLUSTER_FREE" command = serializers.XMLNodeField( "Command", default=CommandType.CREATE_SESSION, parse_callback=lambda t: MaxFrameTask.CommandType(t.upper()), serialize_callback=lambda t: t.value, ) def __init__(self, **kwargs): kwargs["name"] = kwargs.get("name") or self._anonymous_task_name self._major_version = kwargs.pop("major_version", None) self._service_endpoint = kwargs.pop("service_endpoint", None) super(MaxFrameTask, self).__init__(**kwargs) if self.properties is None: self.properties = OrderedDict() self.properties["settings"] = "{}" def serial(self): if options.default_task_settings: settings = options.default_task_settings.copy() else: settings = OrderedDict() if self._major_version is not None: settings["odps.task.major.version"] = self._major_version if self._service_endpoint is not None: settings["odps.service.endpoint"] = self._service_endpoint if "settings" in self.properties: settings.update(json.loads(self.properties["settings"])) final_settings = OrderedDict() for k, v in settings.items(): if isinstance(v, six.string_types): final_settings[k] = v elif isinstance(v, bool): final_settings[k] = str(v).lower() else: final_settings[k] = str(v) self.properties["settings"] = json.dumps(final_settings) return super(MaxFrameTask, self).serial()