pai/api/job.py (116 lines of code) (raw):

# Copyright 2023 Alibaba, Inc. or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from datetime import datetime from typing import Any, Dict, List from ..common.logging import get_logger from ..libs.alibabacloud_pai_dlc20201203.models import ( GetJobEventsRequest, GetJobEventsResponseBody, GetPodLogsRequest, GetPodLogsResponseBody, ListEcsSpecsRequest, ListJobsRequest, ListJobsResponseBody, ) from .base import PaginatedResult, ServiceName, WorkspaceScopedResourceAPI logger = get_logger(__name__) class JobAPI(WorkspaceScopedResourceAPI): """Class which provide API to operate job resource.""" BACKEND_SERVICE_NAME = ServiceName.PAI_DLC _list_method = "list_jobs_with_options" _get_method = "get_job_with_options" _delete_method = "delete_job_with_options" _create_method = "create_job_with_options" _stop_method = "stop_job_with_options" _list_ecs_spec_method = "list_ecs_specs_with_options" _get_job_events_method = "get_job_events_with_options" _get_pod_logs_method = "get_pod_logs_with_options" _DEFAULT_PAGE_SIZE = 50 _DEFAULT_PAGE_NUMBER = 1 @classmethod def _generate_display_name(cls, job): return "{}-{}".format( type(job).__name__, datetime.now().isoformat(sep="-", timespec="seconds") ) def list( self, display_name=None, start_time=None, end_time=None, resource_id=None, sort_by=None, order=None, status=None, tags=None, page_number=1, page_size=50, ) -> PaginatedResult: """List the DlcJob resource which match given condition. Args: display_name: Display name of the job, support fuzz matching. start_time: Start time of the job end_time: resource_id: status: sort_by: order: tags: page_number: page_size: Returns: """ request = ListJobsRequest( display_name=display_name, start_time=start_time, end_time=end_time, page_size=page_size, page_number=page_number, resource_id=resource_id, sort_by=sort_by, order=order, status=status, tags=tags, ) resp: ListJobsResponseBody = self._do_request( method_=self._list_method, tmp_req=request ) return self.make_paginated_result(resp) def get_api_object_by_resource_id(self, resource_id): resp = self._do_request(method_=self._get_method, job_id=resource_id) return resp.to_map() def get(self, id: str) -> Dict[str, Any]: """Get the DlcJob resource by job_id. Args: id: ID of the Job. Returns: DlcJob: """ return self.get_api_object_by_resource_id(resource_id=id) def delete(self, id: str) -> None: """Delete the job.""" self._do_request(method_=self._delete_method, job_id=id) def stop(self, id: str) -> None: """Stop the job.""" self._do_request(method_=self._stop_method, job_id=id) def list_events(self, id, start_time=None, end_time=None, max_events_num=2000): """Get Events of the DLC Job. Args: id (str): Job Id start_time: Start time of job events range. end_time: End time of job events range. max_events_num: Max event number return from the response. Returns: List[str]: List of job events. """ # TODO(LiangQuan): start_time/end_time support type datatime. request = GetJobEventsRequest( start_time=start_time, end_time=end_time, max_events_num=max_events_num ) result: GetJobEventsResponseBody = self._do_request( method_=self._get_job_events_method, job_id=id, request=request ) return result.events def list_logs( self, job_id, pod_id, end_time: datetime = None, start_time: datetime = None, max_lines: int = None, pod_uid: str = None, ) -> List[str]: """List logs of a specific job pod.""" request = GetPodLogsRequest( end_time=end_time, max_lines=max_lines, pod_uid=pod_uid, start_time=start_time, ) result: GetPodLogsResponseBody = self._do_request( method_=self._get_pod_logs_method, job_id=job_id, pod_id=pod_id, request=request, ) return result.logs def list_ecs_specs( self, accelerator_type=None, order="asc", page_number=1, page_size=10, sort_by="Gpu", ) -> PaginatedResult: """List EcsSpecs that DLC service provided.""" request = ListEcsSpecsRequest( accelerator_type=accelerator_type, order=order, page_number=page_number, page_size=page_size, sort_by=sort_by, ) result = self._do_request(method_=self._list_ecs_spec_method, request=request) return self.make_paginated_result(result, item_key="EcsSpecs")