pai/api/pipeline_run.py (168 lines of code) (raw):
# Copyright 2023 Alibaba, Inc. or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..common.yaml_utils import dump as yaml_dump
from ..libs.alibabacloud_paiflow20210202.models import (
CreateRunRequest,
GetNodeRequest,
GetNodeResponseBody,
GetRunRequest,
GetRunResponseBody,
ListNodeLogsRequest,
ListNodeLogsResponseBody,
ListNodeOutputsRequest,
ListNodeOutputsResponseBody,
ListRunsRequest,
ListRunsResponseBody,
UpdateRunRequest,
)
from .base import ServiceName, WorkspaceScopedResourceAPI
class PipelineRunAPI(WorkspaceScopedResourceAPI):
BACKEND_SERVICE_NAME = ServiceName.PAIFLOW
_list_method = "list_runs_with_options"
_get_method = "get_run_with_options"
_create_method = "create_run_with_options"
_start_method = "start_run_with_options"
_terminate_method = "terminate_run_with_options"
_update_method = "update_run_with_options"
_get_node_method = "get_node_with_options"
_list_node_logs_method = "list_node_logs_with_options"
_list_node_outputs_method = "list_node_outputs_with_options"
def list(
self,
name=None,
run_id=None,
pipeline_id=None,
status=None,
sort_by=None,
order=None,
page_number=None,
page_size=None,
experiment_id=None,
source=None,
**kwargs,
):
request = ListRunsRequest(
page_number=page_number,
page_size=page_size,
experiment_id=experiment_id,
name=name,
pipeline_id=pipeline_id,
run_id=run_id,
sort_by=sort_by,
order=order,
source=source,
status=status,
**kwargs,
)
resp: ListRunsResponseBody = self._do_request(
method_=self._list_method, request=request
)
return self.make_paginated_result(resp.to_map())
def get(self, run_id):
request = GetRunRequest()
resp: GetRunResponseBody = self._do_request(
method_=self._get_method, run_id=run_id, request=request
)
return resp.to_map()
def create(
self,
name,
pipeline_id=None,
manifest=None,
arguments=None,
env=None,
no_confirm_required=False,
source="SDK",
):
run_args = {"arguments": arguments, "env": env}
if not pipeline_id and not manifest:
raise ValueError(
"Create pipeline run require either pipeline_id or manifest."
)
if pipeline_id and manifest:
raise ValueError(
"Both pipeline_id and manifest are provide, create_run need only one."
)
if not name:
raise ValueError("Pipeline run instance need a name.")
run_args = yaml_dump(run_args)
if manifest and isinstance(manifest, dict):
manifest = yaml_dump(manifest)
request = CreateRunRequest(
pipeline_id=pipeline_id,
name=name,
pipeline_manifest=manifest,
arguments=run_args,
no_confirm_required=no_confirm_required,
source=source,
)
resp = self._do_request(self._create_method, request=request)
return resp.run_id
def start(self, run_id):
self._do_request(self._start_method, run_id=run_id)
def terminate_run(self, run_id):
self._do_request(self._terminate_method, run_id=run_id)
def update(self, run_id, name):
request = UpdateRunRequest(name=name)
self._do_request(self._update_method, run_id=run_id, request=request)
def get_node(self, run_id, node_id, depth=2):
request = GetNodeRequest(depth=depth)
resp: GetNodeResponseBody = self._do_request(
method_=self._get_node_method,
run_id=run_id,
node_id=node_id,
request=request,
)
return resp.to_map()
def list_node_logs(
self,
run_id,
node_id,
from_time=None,
to_time=None,
keyword=None,
reverse=False,
page_offset=0,
page_size=100,
):
request = ListNodeLogsRequest(
offset=page_offset,
page_size=page_size,
from_time_in_seconds=from_time,
to_time_in_seconds=to_time,
keyword=keyword,
reverse=reverse,
)
resp: ListNodeLogsResponseBody = self._do_request(
self._list_node_logs_method,
run_id=run_id,
node_id=node_id,
request=request,
)
return self.make_paginated_result(resp.to_map())
def list_node_outputs(
self,
run_id,
node_id,
depth=2,
name=None,
sort_by=None,
order=None,
type=None,
page_number=1,
page_size=50,
):
request = ListNodeOutputsRequest(
name=name,
depth=depth,
page_number=page_number,
page_size=page_size,
sort_by=sort_by,
order=order,
type=type,
)
resp: ListNodeOutputsResponseBody = self._do_request(
self._list_node_outputs_method,
run_id=run_id,
node_id=node_id,
request=request,
)
return self.make_paginated_result(resp.to_map())