src/pydolphinscheduler/core/task.py (447 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """DolphinScheduler Task and TaskRelation object.""" from __future__ import annotations import copy import types import warnings from collections.abc import Sequence from datetime import timedelta from logging import getLogger from pydolphinscheduler import configuration from pydolphinscheduler.constants import ( Delimiter, IsCache, ResourceKey, Symbol, TaskFlag, TaskPriority, TaskTimeoutFlag, ) from pydolphinscheduler.core.parameter import BaseDataType, Direction, ParameterHelper from pydolphinscheduler.core.resource import Resource from pydolphinscheduler.core.resource_plugin import ResourcePlugin from pydolphinscheduler.core.workflow import Workflow, WorkflowContext from pydolphinscheduler.exceptions import PyDSParamException, PyResPluginException from pydolphinscheduler.java_gateway import gateway from pydolphinscheduler.models import Base from pydolphinscheduler.utils.date import timedelta2timeout logger = getLogger(__name__) class TaskRelation(Base): """TaskRelation object, describe the relation of exactly two tasks.""" # Add attr `_KEY_ATTR` to overwrite :func:`__eq__`, it is make set # `Task.workflow._task_relations` work correctly. _KEY_ATTR = { "pre_task_code", "post_task_code", } _DEFINE_ATTR = { "pre_task_code", "post_task_code", } _DEFAULT_ATTR = { "name": "", "preTaskVersion": 1, "postTaskVersion": 1, "conditionType": 0, "conditionParams": {}, } def __init__( self, pre_task_code: int, post_task_code: int, name: str | None = None, ): super().__init__(name) self.pre_task_code = pre_task_code self.post_task_code = post_task_code def __hash__(self): return hash(f"{self.pre_task_code} {Delimiter.DIRECTION} {self.post_task_code}") class Task(Base): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = { "name", "code", "version", "task_type", "task_params", "description", "flag", "task_priority", "worker_group", "environment_code", "delay_time", "fail_retry_times", "fail_retry_interval", "task_group_id", "task_group_priority", "timeout_flag", "timeout_notify_strategy", "timeout", "is_cache", } # task default attribute will into `task_params` property _task_default_attr = { "local_params", "resource_list", "dependence", "wait_start_timeout", "condition_result", } # task attribute ignore from _task_default_attr and will not into `task_params` property _task_ignore_attr: set = set() # task custom attribute define in sub class and will append to `task_params` property _task_custom_attr: set = set() ext: set = None ext_attr: str | types.FunctionType = None DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__(name, description) self.task_type = task_type self.flag = flag self._is_cache = is_cache self.task_priority = task_priority self.worker_group = worker_group self._environment_name = environment_name self.task_group_id = task_group_id self.task_group_priority = task_group_priority self.fail_retry_times = fail_retry_times self.fail_retry_interval = fail_retry_interval self.delay_time = delay_time self.timeout_notify_strategy = timeout_notify_strategy self._timeout: timedelta | int = timeout self._workflow = None self._input_params = input_params or {} self._output_params = output_params or {} if "process_definition" in kwargs: warnings.warn( "The `process_definition` parameter is deprecated, please use `workflow` instead.", DeprecationWarning, ) self.workflow = kwargs.pop("process_definition") else: self.workflow: Workflow = workflow or WorkflowContext.get() if "local_params" in kwargs: warnings.warn( """The `local_params` parameter is deprecated, please use `input_params` and `output_params` instead. """, DeprecationWarning, ) self._local_params = kwargs.get("local_params") self._upstream_task_codes: set[int] = set() self._downstream_task_codes: set[int] = set() self._task_relation: set[TaskRelation] = set() # move attribute code and version after _workflow and workflow declare self.code, self.version = self.gen_code_and_version() # Add task to workflow, maybe we could put into property workflow latter if self.workflow is not None and self.code not in self.workflow.tasks: self.workflow.add_task(self) else: logger.warning( "Task code %d already in workflow, prohibit re-add task.", self.code, ) # Attribute for task param self._resource_list = resource_list or [] self.dependence = dependence or {} self.wait_start_timeout = wait_start_timeout or {} self._condition_result = condition_result or self.DEFAULT_CONDITION_RESULT self.resource_plugin = resource_plugin self.get_content() @property def workflow(self) -> Workflow | None: """Get attribute workflow.""" return self._workflow @workflow.setter def workflow(self, workflow: Workflow | None): """Set attribute workflow.""" self._workflow = workflow @property def timeout(self) -> int: """Get attribute timeout.""" if isinstance(self._timeout, int): if self._timeout < 0: raise PyDSParamException("The timeout value must be greater than 0") return self._timeout return timedelta2timeout(self._timeout) if self._timeout else 0 @property def timeout_flag(self) -> str: """Whether the timeout attribute is being set or not.""" return TaskTimeoutFlag.ON if self._timeout else TaskTimeoutFlag.OFF @property def is_cache(self) -> str: """Whether the cache is being set or not.""" if isinstance(self._is_cache, bool): return IsCache.YES if self._is_cache else IsCache.NO else: raise PyDSParamException("is_cache must be a bool") @property def resource_list(self) -> list[dict[str, Resource]]: """Get task define attribute `resource_list`.""" resources = set() for res in self._resource_list: if isinstance(res, str): resources.add( Resource( name=res, user_name=self.user_name ).get_fullname_from_database() ) elif isinstance(res, dict) and ResourceKey.NAME in res: warnings.warn( """`resource_list` should be defined using List[str] with resource paths, the use of ids to define resources will be remove in version 3.2.0. """, DeprecationWarning, stacklevel=2, ) resources.add(res.get(ResourceKey.NAME)) return [{ResourceKey.NAME: r} for r in resources] @property def user_name(self) -> str | None: """Return username of workflow.""" if self.workflow: return self.workflow.user.name else: raise PyDSParamException("`user_name` cannot be empty.") @property def condition_result(self) -> dict: """Get attribute condition_result.""" return self._condition_result @condition_result.setter def condition_result(self, condition_result: dict | None): """Set attribute condition_result.""" self._condition_result = condition_result def _get_attr(self) -> set[str]: """Get final task task_params attribute. Base on `_task_default_attr`, append attribute from `_task_custom_attr` and subtract attribute from `_task_ignore_attr`. """ attr = copy.deepcopy(self._task_default_attr) attr -= self._task_ignore_attr attr |= self._task_custom_attr return attr @property def task_params(self) -> dict | None: """Get task parameter object. Will get result to combine _task_custom_attr and custom_attr. """ custom_attr = self._get_attr() return self.get_define_custom(custom_attr=custom_attr) def get_plugin(self): """Return the resource plug-in. according to parameter resource_plugin and parameter workflow.resource_plugin. """ if self.resource_plugin is None: if self.workflow.resource_plugin is not None: return self.workflow.resource_plugin else: raise PyResPluginException( "The execution command of this task is a file, but the resource plugin is empty" ) else: return self.resource_plugin def get_content(self): """Get the file content according to the resource plugin.""" if self.ext_attr is None and self.ext is None: return _ext_attr = getattr(self, self.ext_attr) if _ext_attr is not None: if isinstance(_ext_attr, str) and _ext_attr.endswith(tuple(self.ext)): res = self.get_plugin() content = res.read_file(_ext_attr) setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), content) else: if self.resource_plugin is not None or ( self.workflow is not None and self.workflow.resource_plugin is not None ): index = _ext_attr.rfind(Symbol.POINT) if index != -1: raise ValueError( f"This task does not support files with suffix {_ext_attr[index:]}," f"only supports {Symbol.COMMA.join(str(suf) for suf in self.ext)}" ) setattr(self, self.ext_attr.lstrip(Symbol.UNDERLINE), _ext_attr) def __hash__(self): return hash(self.code) def __lshift__(self, other: Task | Sequence[Task]): """Implement Task << Task.""" self.set_upstream(other) return other def __rshift__(self, other: Task | Sequence[Task]): """Implement Task >> Task.""" self.set_downstream(other) return other def __rrshift__(self, other: Task | Sequence[Task]): """Call for Task >> [Task] because list don't have __rshift__ operators.""" self.__lshift__(other) return self def __rlshift__(self, other: Task | Sequence[Task]): """Call for Task << [Task] because list don't have __lshift__ operators.""" self.__rshift__(other) return self def _set_deps(self, tasks: Task | Sequence[Task], upstream: bool = True) -> None: """Set parameter tasks dependent to current task. it is a wrapper for :func:`set_upstream` and :func:`set_downstream`. """ if not isinstance(tasks, Sequence): tasks = [tasks] for task in tasks: if upstream: self._upstream_task_codes.add(task.code) task._downstream_task_codes.add(self.code) if self._workflow: task_relation = TaskRelation( pre_task_code=task.code, post_task_code=self.code, name=f"{task.name} {Delimiter.DIRECTION} {self.name}", ) self.workflow._task_relations.add(task_relation) else: self._downstream_task_codes.add(task.code) task._upstream_task_codes.add(self.code) if self._workflow: task_relation = TaskRelation( pre_task_code=self.code, post_task_code=task.code, name=f"{self.name} {Delimiter.DIRECTION} {task.name}", ) self.workflow._task_relations.add(task_relation) def set_upstream(self, tasks: Task | Sequence[Task]) -> None: """Set parameter tasks as upstream to current task.""" self._set_deps(tasks, upstream=True) def set_downstream(self, tasks: Task | Sequence[Task]) -> None: """Set parameter tasks as downstream to current task.""" self._set_deps(tasks, upstream=False) # TODO code should better generate in bulk mode when :ref: workflow run submit or start def gen_code_and_version(self) -> tuple: """Generate task code and version from java gateway. If task name do not exists in workflow before, if will generate new code and version id equal to 0 by java gateway, otherwise if will return the exists code and version. """ # TODO get code from specific project workflow and task name result = gateway.get_code_and_version( self.workflow._project, self.workflow.name, self.name ) # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) # gateway_result_checker(result) return result.get("code"), result.get("version") @property def environment_code(self) -> str: """Convert environment name to code.""" if self._environment_name is None: return None return gateway.query_environment_info(self._environment_name) @property def local_params(self): """Convert local params.""" local_params = ( copy.deepcopy(self._local_params) if hasattr(self, "_local_params") else [] ) local_params.extend( ParameterHelper.convert_params(self._input_params, Direction.IN) ) local_params.extend( ParameterHelper.convert_params(self._output_params, Direction.OUT) ) return local_params def add_in( self, name: str, value: int | str | float | bool | BaseDataType | None = None, ): """Add input parameters. :param name: name of the input parameter. :param value: value of the input parameter. It could be simply command:: task.add_in("a") task.add_in("b", 123) task.add_in("c", bool) task.add_in("d", ParameterType.LONG(123)) """ self._input_params[name] = value def add_out( self, name: str, value: int | str | float | bool | BaseDataType | None = None, ): """Add output parameters. :param name: name of the output parameter. :param value: value of the output parameter. It could be simply command:: task.add_out("a") task.add_out("b", 123) task.add_out("c", bool) task.add_out("d", ParameterType.LONG(123)) """ self._output_params[name] = value class BatchTask(Task): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__( name, task_type, description, flag, task_priority, worker_group, environment_name, task_group_id, task_group_priority, delay_time, fail_retry_times, fail_retry_interval, timeout_notify_strategy, timeout, workflow, resource_list, dependence, wait_start_timeout, condition_result, resource_plugin, is_cache, input_params, output_params, *args, **kwargs, ) self.task_execute_type = "BATCH" class StreamTask(Task): """Task object, parent class for all exactly task type. :param name: The name of the task. Node names within the same workflow must be unique. :param task_type: :param description: default None :param flag: default TaskFlag.YES, :param task_priority: default TaskPriority.MEDIUM :param worker_group: default configuration.WORKFLOW_WORKER_GROUP :param environment_name: default None :param task_group_id: Identify of task group to restrict the parallelism of tasks instance run, default 0. :param task_group_priority: Priority for same task group to, the higher the value, the higher the priority, default 0. :param delay_time: deault 0 :param fail_retry_times: default 0 :param fail_retry_interval: default 1 :param timeout_notify_strategy: default, None :param timeout: Timeout attribute for task, in minutes. Task is consider as timed out task when the running time of a task exceeds than this value. when data type is :class:`datetime.timedelta` will be converted to int(in minutes). default ``None`` :param resource_list: default None :param wait_start_timeout: default None :param condition_result: default None, :param resource_plugin: default None :param is_cache: default False :param input_params: default None, input parameters, {param_name: param_value} :param output_params: default None, input parameters, {param_name: param_value} """ _DEFINE_ATTR = Task._DEFINE_ATTR | {"task_execute_type"} def __init__( self, name: str, task_type: str, description: str | None = None, flag: str | None = TaskFlag.YES, task_priority: str | None = TaskPriority.MEDIUM, worker_group: str | None = configuration.WORKFLOW_WORKER_GROUP, environment_name: str | None = None, task_group_id: int | None = 0, task_group_priority: int | None = 0, delay_time: int | None = 0, fail_retry_times: int | None = 0, fail_retry_interval: int | None = 1, timeout_notify_strategy: str | None = None, timeout: timedelta | int | None = None, workflow: Workflow | None = None, resource_list: list | None = None, dependence: dict | None = None, wait_start_timeout: dict | None = None, condition_result: dict | None = None, resource_plugin: ResourcePlugin | None = None, is_cache: bool | None = False, input_params: dict | None = None, output_params: dict | None = None, *args, **kwargs, ): super().__init__( name, task_type, description, flag, task_priority, worker_group, environment_name, task_group_id, task_group_priority, delay_time, fail_retry_times, fail_retry_interval, timeout_notify_strategy, timeout, workflow, resource_list, dependence, wait_start_timeout, condition_result, resource_plugin, is_cache, input_params, output_params, *args, **kwargs, ) self.task_execute_type = "STREAM"