cupid/rpc.py (179 lines of code) (raw):
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import uuid
import os
import sys
import logging
import warnings
import struct
import time
from google.protobuf import service as pb_service
from odps import compat
from odps.compat import six
from odps.errors import InvalidStateSetting, NoSuchObject
from . import errors
from .config import options
from .utils import get_property
try:
from .proto import cupidtaskparam_pb2
except TypeError:
warnings.warn('Cannot import protos from pycupid: '
'consider upgrading your protobuf python package.', ImportWarning)
raise ImportError
logger = logging.getLogger(__name__)
logger_mars = logging.getLogger('odps.mars_extension.cupid')
if sys.version_info[0] >= 3:
b64decodebytes = base64.decodebytes
else:
b64decodebytes = base64.decodestring
class CupidRpcController(pb_service.RpcController):
def __init__(self):
super(CupidRpcController, self).__init__()
self._fail_msg = None
def Reset(self):
self._fail_msg = None
def Failed(self):
return self._fail_msg is not None
def ErrorText(self):
return self._fail_msg
def StartCancel(self):
pass
def SetFailed(self, reason):
self._fail_msg = reason
def IsCanceled(self):
return False
def NotifyOnCancel(self, callback):
pass
class CupidRpcChannel(pb_service.RpcChannel):
@staticmethod
def submit_job(param_pb, running_mode, priority=None, with_resource=False, session=None,
running_cluster=None, task_name='cupid_task'):
from odps import ODPS, compat
from odps.models import CupidTask
if logger.getEffectiveLevel() <= logging.DEBUG:
param_pb_str = str(param_pb)
if isinstance(param_pb, cupidtaskparam_pb2.CupidTaskParam):
for conf in param_pb.jobconf.jobconfitem:
if conf.key == 'odps.access.id':
param_pb_str = param_pb_str.replace(conf.value, '** access-id **')
elif conf.key == 'odps.access.key':
param_pb_str = param_pb_str.replace(conf.value, '** access-key **')
logger.debug('Job param proto: %s', param_pb_str)
odps = session.odps if session else ODPS.from_global()
plan_string = param_pb.SerializeToString()
res_kw = dict(fileobj=compat.BytesIO(plan_string))
if not with_resource:
res_kw['is_temp_resource'] = True
res = odps.create_resource('cupid_plan_' + str(uuid.uuid4()), 'file', **res_kw)
task_info = ','.join([res.name, odps.project, running_mode])
props = dict()
if options.default_task_settings:
props.update(options.default_task_settings)
if options.cupid.application_type:
props['odps.cupid.application.type'] = options.cupid.application_type
props['odps.moye.runtime.type'] = options.cupid.application_type
if options.biz_id:
props['biz_id'] = options.biz_id
if options.cupid.major_task_version:
props['odps.task.major.version'] = options.cupid.major_task_version
context_file = get_property('odps.exec.context.file')
if context_file and os.path.exists(context_file):
with open(context_file, 'r') as cf:
file_settings = json.loads(cf.read()).get('settings', {})
props.update(file_settings)
task = CupidTask(task_name, task_info, props)
inst = odps.get_project().instances.create(task=task, priority=priority, running_cluster=running_cluster)
inst = odps.get_instance(inst.id)
return inst
@staticmethod
def get_cupid_detail(inst):
inst.reload()
params = {'instancedetail': '', 'taskname': 'cupid_task'}
resp = inst._client.get(inst.resource(), params=params)
if resp.content == b'{}':
logger.debug('Empty content retrieved. Use get_task_detail2 first.')
inst.get_task_detail2('cupid_task')
resp = inst._client.get(inst.resource(), params=params)
return resp.content
@classmethod
def get_cupid_status(cls, inst):
details = cls.get_cupid_detail(inst)
if details.startswith(b'Failed') and b'recycled' in details:
raise errors.InstanceRecycledError(details)
details_pb = cupidtaskparam_pb2.CupidTaskDetailResultParam()
details_pb.ParseFromString(b64decodebytes(details))
status_keys = ('ready', 'waiting', 'running', 'success', 'failed', 'cancelled')
if not any(details_pb.HasField(sk) for sk in status_keys):
details_pb.ready.CopyFrom(cupidtaskparam_pb2.Ready())
return details_pb
@classmethod
def wait_cupid_instance(cls, inst, state=None, master_timeout=120):
state = state or 'success'
wait_keys = ('running', 'success', 'ready')
sleep_time = 0
while True:
result = cls.get_cupid_status(inst)
sleep_time = min(5, sleep_time + 1)
if result.HasField('failed'):
if result.failed.HasField('cupidTaskFailed'):
msg = result.failed.cupidTaskFailed.cupidTaskFailedMsg
else:
msg = result.failed.bizFailed.bizFailedMsg
if msg.startswith("runTask failed:") or msg.startswith("app run failed!"):
raise errors.CupidUserError(msg)
else:
raise errors.CupidError(msg)
elif result.HasField('cancelled'):
raise errors.CupidUserError('Instance canceled.')
elif result.HasField(state):
return getattr(getattr(result, state), state + 'Msg')
elif any(result.HasField(wk) for wk in wait_keys):
time.sleep(sleep_time)
else:
msg_pb = str(result)
if msg_pb != '':
logger.warning('Unexpected status: %s' % msg_pb)
time.sleep(sleep_time)
class CupidTaskServiceRpcChannel(CupidRpcChannel):
def __init__(self, session):
super(CupidTaskServiceRpcChannel, self).__init__()
self.cupid_session = session
def CallMethod(self, method, rpc_controller,
request, response_class, done):
task_service_req = cupidtaskparam_pb2.TaskServiceRequest(
methodName=method.name,
requestInBytes=request.SerializeToString(),
)
job_conf = self.cupid_session.job_conf()
task_operator = cupidtaskparam_pb2.CupidTaskOperator(moperator='TaskServiceRequest')
task_param = cupidtaskparam_pb2.CupidTaskParam(
mcupidtaskoperator=task_operator, jobconf=job_conf,
taskServiceRequest=task_service_req
)
inst = self.submit_job(task_param, 'eAsyncNotFuxiJob', session=self.cupid_session)
logger_mars.debug('Cupid task instance: %s, method: %s' % (inst.id, method.name))
resp_str = self.wait_cupid_instance(inst)
if isinstance(resp_str, six.text_type):
resp_str = resp_str.encode()
resp = response_class()
resp.ParseFromString(b64decodebytes(resp_str))
return resp
class SandboxRpcChannel(pb_service.RpcChannel):
def CallMethod(self, method, controller, request, response_class, done):
from .runtime import context
context = context()
try:
sio = compat.BytesIO()
sio.write(struct.pack('<I', method.index))
sio.write(request.SerializeToString())
logger.debug('SandboxRpcChannel sync_call service: %s, method id: %d request: %s',
method.containing_service.full_name, method.index, request)
res = context.channel_client.sync_call(method.containing_service.full_name,
sio.getvalue())
resp = response_class()
resp.ParseFromString(res)
logger.debug('SandboxRpcChannel sync_call result: %s', resp)
return resp
except Exception as exc:
logger.exception('SandboxRpcChannel CallMethod fail: %s', str(exc))
controller.SetFailed(str(exc))