cupid/session.py (143 lines of code) (raw):
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import warnings
from odps.compat import six
from odps.utils import with_wait_argument
from .config import options
from .errors import CupidMasterTimeoutError
from .rpc import CupidTaskServiceRpcChannel
from .utils import get_environ
try:
from .proto import cupidtaskparam_pb2 as task_param_pb
except TypeError:
warnings.warn('Cannot import protos from pycupid: '
'consider upgrading your protobuf python package.', ImportWarning)
raise ImportError
logger = logging.getLogger(__name__)
_CUPID_CONF_PREFIXES = 'odps cupid'.split()
class CupidSession(object):
def __init__(self, odps=None, project=None):
from .runtime import context
from odps import ODPS
self._context = context()
if odps is not None:
self.odps = odps
else:
self.odps = ODPS.from_global()
self.project = project if project is not None else self.odps.project
self.lookup_name = get_environ('META_LOOKUP_NAME', '')
self.running = False
self.save_id = None
self.job_running_event = threading.Event()
self._kube_url = None
def check_running(self, timeout=None):
timeout = timeout or options.cupid.wait_am_start_time
if not self.running:
self.job_running_event.wait(timeout)
return self.running
@property
def account(self):
# todo add support for bearer token
return self.odps.account
def job_conf(self, conf=None):
conf_items = {
'odps.task.major.version': options.cupid.major_task_version,
'odps.access.id': getattr(self.odps.account, 'access_id', None),
'odps.access.key': getattr(self.odps.account, 'secret_access_key', None),
'odps.end.point': self.odps.endpoint,
'odps.project.name': self.project,
'odps.moye.am.cores': '400',
'odps.cupid.proxy.end.point': options.cupid.proxy_endpoint,
}
if conf:
conf_items.update(conf)
for k, v in six.iteritems(options.cupid.settings or {}):
if any(k.startswith(pf) for pf in _CUPID_CONF_PREFIXES):
conf_items[k] = v
conf_obj_items = [task_param_pb.JobConfItem(key=k, value=str(v))
for k, v in six.iteritems(conf_items) if v is not None]
return task_param_pb.JobConf(jobconfitem=conf_obj_items)
def get_proxy_token(self, instance, app_name, expired_in_hours):
if hasattr(instance, 'id'):
instance = instance.id
task_operator = task_param_pb.CupidTaskOperator(moperator='GetCupidProxyToken')
proxy_token_request = task_param_pb.CupidProxyTokenRequest(
instanceId=instance,
appName=app_name,
expiredInHours=expired_in_hours,
)
task_param = task_param_pb.CupidTaskParam(
mcupidtaskoperator=task_operator,
cupidProxyTokenRequest=proxy_token_request,
)
channel = CupidTaskServiceRpcChannel(self)
inst = channel.submit_job(task_param, 'eAsyncNotFuxiJob', session=self)
return channel.wait_cupid_instance(inst)
def get_proxied_url(self, instance, app_name, expired_in_hours=None):
expired_in_hours = expired_in_hours or 30 * 24
return 'http://%s.%s' % (self.get_proxy_token(instance, app_name, expired_in_hours),
options.cupid.proxy_endpoint)
@with_wait_argument
def start_kubernetes(self, async_=False, priority=None, running_cluster=None,
proxy_endpoint=None, major_task_version=None,
app_command=None, app_image=None, resources=None, **kw):
priority = priority if priority is not None else options.priority
if priority is None and options.get_priority is not None:
priority = options.get_priority(self.odps)
menginetype = options.cupid.engine_running_type
if proxy_endpoint is not None:
options.cupid.proxy_endpoint = proxy_endpoint
if major_task_version is not None:
options.cupid.major_task_version = major_task_version
runtime_endpoint = kw.pop('runtime_endpoint', None)
task_operator = task_param_pb.CupidTaskOperator(moperator='startam', menginetype=menginetype)
task_name = kw.pop('task_name', None)
if len(kw) > 0:
raise ValueError('Got unexpected arguments: {}'.format(list(kw.keys())[0]))
kub_conf = {
'odps.cupid.kube.master.mode': options.cupid.kube.master_mode,
'odps.cupid.master.type': options.cupid.master_type,
'odps.cupid.engine.running.type': menginetype,
'odps.cupid.job.capability.duration.hours': options.cupid.job_duration_hours,
'odps.cupid.channel.init.timeout.seconds': options.cupid.channel_init_timeout_seconds,
'odps.moye.runtime.type': options.cupid.application_type,
'odps.runtime.end.point': runtime_endpoint or options.cupid.runtime.endpoint,
'odps.cupid.resources': ','.join(resources or []),
'odps.cupid.kube.appmaster.cmd': app_command,
'odps.cupid.kube.appmaster.image': app_image
}
if running_cluster:
kub_conf['odps.cupid.task.running.cluster'] = running_cluster
if options.cupid.container_node_label is not None:
kub_conf['odps.cupid.container.node.label'] = options.cupid.container_node_label
if options.cupid.master.virtual_resource is not None:
kub_conf['odps.cupid.master.virtual.resource'] = options.cupid.master.virtual_resource
task_param = task_param_pb.CupidTaskParam(
jobconf=self.job_conf(conf=kub_conf),
mcupidtaskoperator=task_operator,
)
retrial = 0
while True:
retrial += 1
channel = CupidTaskServiceRpcChannel(self)
inst = channel.submit_job(task_param, 'eHasFuxiJob', with_resource=True, priority=priority,
running_cluster=running_cluster, session=self, task_name=task_name)
if async_:
return inst
else:
try:
return self.get_instance_kube(inst)
except CupidMasterTimeoutError:
if retrial == 3:
raise
def get_instance_kube_api(self, inst, expired_in_hours=None):
if self._kube_url is None:
CupidTaskServiceRpcChannel.wait_cupid_instance(inst, 'running')
self._kube_url = self.get_proxied_url(inst, '', expired_in_hours or 30 * 24)
return self._kube_url
def get_instance_kube(self, inst, expired_in_hours=None):
from kubernetes import client
config = client.Configuration()
config.host = self.get_instance_kube_api(inst, expired_in_hours)
return client.ApiClient(config)
from .io import *
__all__ = ['CupidSession', 'CupidTableUploadSession', 'CupidTableDownloadSession']