core/maxframe/udf.py (118 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import shlex from typing import Callable, List, Optional, Union from odps.models import Resource from .serialization.serializables import ( BoolField, DictField, FieldTypes, FunctionField, ListField, Serializable, StringField, ) from .utils import tokenize class PythonPackOptions(Serializable): _key_args = ("force_rebuild", "prefer_binary", "pre_release", "no_audit_wheel") key = StringField("key") requirements = ListField("requirements", FieldTypes.string, default_factory=list) force_rebuild = BoolField("force_rebuild", default=False) prefer_binary = BoolField("prefer_binary", default=False) pre_release = BoolField("pre_release", default=False) pack_instance_id = StringField("pack_instance_id", default=None) no_audit_wheel = BoolField("no_audit_wheel", default=False) def __init__(self, key: str = None, **kw): super().__init__(key=key, **kw) if self.key is None: args = {k: getattr(self, k) for k in self._key_args} self.key = tokenize(set(self.requirements), args) def __repr__(self): args_str = " ".join(f"{k}={getattr(self, k)}" for k in self._key_args) return f"<PythonPackOptions {self.requirements} {args_str}>" class MarkedFunction(Serializable): func = FunctionField("func") resources = ListField("resources", FieldTypes.string, default_factory=list) pythonpacks = ListField("pythonpacks", FieldTypes.reference, default_factory=list) expect_engine = StringField("expect_engine", default=None) expect_resources = DictField( "expect_resources", FieldTypes.string, default_factory=dict ) def __init__(self, func: Optional[Callable] = None, **kw): super().__init__(func=func, **kw) def __getattr__(self, item): return getattr(self.func, item) def __call__(self, *args, **kw): return self.func(*args, **kw) def __repr__(self): return f"<MarkedFunction {self.func!r}>" def with_resources(*resources: Union[str, Resource], use_wrapper_class: bool = True): def res_to_str(res: Union[str, Resource]) -> str: if isinstance(res, str): return res res_parts = [res.project.name] if res.schema: res_parts.extend(["schemas", res.schema]) res_parts.extend(["resources", res.name]) return "/".join(res_parts) def func_wrapper(func): str_resources = [res_to_str(r) for r in resources] if not use_wrapper_class: existing = getattr(func, "resources") or [] func.resources = existing + str_resources return func if isinstance(func, MarkedFunction): func.resources = func.resources + str_resources return func return MarkedFunction(func, resources=str_resources) return func_wrapper def with_python_requirements( *requirements: str, force_rebuild: bool = False, prefer_binary: bool = False, pre_release: bool = False, no_audit_wheel: bool = False, ): result_req = [] for req in requirements: result_req.extend(shlex.split(req)) def func_wrapper(func): pack_item = PythonPackOptions( requirements=requirements, force_rebuild=force_rebuild, prefer_binary=prefer_binary, pre_release=pre_release, no_audit_wheel=no_audit_wheel, ) if isinstance(func, MarkedFunction): func.pythonpacks.append(pack_item) return func return MarkedFunction(func, pythonpacks=[pack_item]) return func_wrapper def with_running_options( *, engine: Optional[str] = None, cpu: Optional[int] = None, memory: Optional[int] = None, **kwargs, ): engine = engine.upper() if engine else None resources = {"cpu": cpu, "memory": memory, **kwargs} if cpu is not None and cpu <= 0: raise ValueError("cpu must be greater than 0") if memory is not None and memory <= 0: raise ValueError("memory must be greater than 0") def func_wrapper(func): if all(v is None for v in (engine, cpu, memory)): return func if isinstance(func, MarkedFunction): func.expect_engine = engine func.expect_resources = resources return func return MarkedFunction(func, expect_engine=engine, expect_resources=resources) return func_wrapper with_resource_libraries = with_resources def get_udf_resources( func: Callable, ) -> List[Union[Resource, str]]: return getattr(func, "resources", None) or [] def get_udf_pythonpacks(func: Callable) -> List[PythonPackOptions]: return getattr(func, "pythonpacks", None) or []