odps/mars_extension/oscar/deploy/client.py (320 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import json
import logging
import time
import requests
import warnings
from mars import new_session
from ....models import Instance
from ....config import options
from .... import errors
from ...utils import get_default_resource_files, build_mars_image_name
NOTEBOOK_NAME = "MarsNotebook"
CUPID_APP_NAME = "MarsWeb"
GS_COORDINATOR_NAME = "GSCoordinator"
DEFAULT_RESOURCES = ["pymars-0.9.0", "pyodps-0.11.5", "pyarrow-4.0.0"]
logger = logging.getLogger(__name__)
class MarsCupidClient:
def __init__(self, odps, inst=None, project=None):
from cupid import CupidSession
self._odps = odps
self._cupid_session = CupidSession(odps, project=project)
self._kube_instance = inst
self._kube_url = None
self._kube_client = None
self._kube_namespace = None
self._supervisor_key = None
self._supervisor_config = None
self._worker_config = None
self._endpoint = None
self._with_notebook = False
self._notebook_endpoint = None
self._with_graphscope = False
self._graphscope_endpoint = None
self._mars_session = None
self._req_session = None
@property
def endpoint(self):
return self._endpoint
@property
def notebook_endpoint(self):
return self._notebook_endpoint
@property
def session(self):
return self._mars_session
@property
def instance_id(self):
return self._kube_instance.id
def rewrite_mars_request(self, request):
from ....rest import default_user_agent
request.headers["User-Agent"] = request.user_agent = default_user_agent()
if "Content-Type" not in request.headers:
request.headers["Content-Type"] = "application/octet-stream"
if options.mars.use_common_proxy:
if self._odps.project is not None:
project_query = "curr_project=" + self._odps.project
if "?" in request.url:
request.url += "&" + project_query
else:
request.url += "?" + project_query
account = self._req_session.account
account.sign_request(request, self._endpoint)
return request
def submit(
self,
image=None,
supervisor_num=1,
supervisor_cpu=4,
supervisor_mem=16 * 1024**3,
worker_num=1,
worker_cpu=8,
worker_mem=32 * 1024**3,
worker_cache_mem=None,
min_worker_num=None,
worker_disk_num=1,
worker_disk_size=100 * 1024**3,
with_notebook=False,
notebook_cpu=1,
notebook_mem=2 * 1024**3,
with_graphscope=False,
coordinator_cpu=1,
coordinator_mem=2 * 1024**3,
timeout=None,
extra_env=None,
extra_modules=None,
resources=None,
create_session=True,
priority=None,
running_cluster=None,
task_name=None,
**kw
):
try:
async_ = kw.pop("async_", None)
# compatible with early version
mars_image = kw.pop("mars_image", None)
default_resources = kw.pop(
"default_resources", None
) or get_default_resource_files(DEFAULT_RESOURCES)
instance_idle_timeout = kw.pop("instance_idle_timeout", None)
node_blacklist = kw.pop("node_blacklist", None)
if with_notebook is not None:
self._with_notebook = bool(with_notebook)
else:
self._with_notebook = options.mars.launch_notebook
if with_graphscope is not None:
self._with_graphscope = bool(with_graphscope)
if self._kube_instance is None:
image = image or build_mars_image_name(mars_image)
extra_modules = extra_modules or []
if isinstance(extra_modules, (tuple, list)):
extra_modules = list(extra_modules) + ["odps.mars_extension"]
else:
extra_modules = [extra_modules, "odps.mars_extension"]
if resources is not None:
if isinstance(resources, (tuple, list)):
resources = list(resources)
resources.extend(default_resources)
else:
resources = [resources] + default_resources
else:
resources = default_resources
if worker_cache_mem is None:
worker_cache_mem = int(worker_mem * 0.4)
else:
worker_cache_mem = worker_cache_mem
cluster_args = dict(
image=image,
supervisor_num=supervisor_num,
supervisor_cpu=supervisor_cpu,
supervisor_mem=supervisor_mem,
worker_num=worker_num,
worker_cpu=worker_cpu,
worker_mem=worker_mem,
worker_cache_mem=worker_cache_mem,
min_worker_num=min_worker_num,
worker_disk_num=worker_disk_num,
worker_disk_size=worker_disk_size,
with_notebook=with_notebook,
notebook_cpu=notebook_cpu,
notebook_mem=notebook_mem,
with_graphscope=with_graphscope,
coordinator_cpu=coordinator_cpu,
coordinator_mem=coordinator_mem,
extra_env=extra_env,
extra_modules=extra_modules,
node_blacklist=node_blacklist,
instance_idle_timeout=instance_idle_timeout,
timeout=timeout,
)
command = "/srv/entrypoint.sh %s %s" % (
__name__.rsplit(".", 1)[0] + ".app",
base64.b64encode(json.dumps(cluster_args).encode()).decode(),
)
self._kube_instance = self._cupid_session.start_kubernetes(
async_=True,
running_cluster=running_cluster,
priority=priority,
app_image=build_mars_image_name(),
app_command=command,
resources=resources,
task_name=task_name,
**kw
)
logger.info(self._kube_instance.get_logview_address())
if async_:
return self
else:
self.wait_for_success(
create_session=create_session,
min_worker_num=min_worker_num or worker_num,
)
return self
except KeyboardInterrupt:
self.stop_server()
return self
def check_service_ready(self, timeout=1):
try:
resp = self._req_session.get(self._endpoint + "/api", timeout=timeout)
except (requests.ConnectionError, requests.Timeout, errors.ODPSError):
return False
if resp.status_code >= 400:
return False
elif b"not found" in resp.content:
self._endpoint = None
return False
return True
def count_workers(self):
resp = self._req_session.get(
self._endpoint + "/api/cluster/nodes?role=1", timeout=1
)
json_result = json.loads(resp.text)
return len(json_result["nodes"])
def rescale_workers(self, new_scale, min_workers=None, wait=True, timeout=None):
self._mars_session._sess.rescale_workers(
new_scale, min_workers=min_workers, wait=wait, timeout=timeout
)
def get_logview_address(self):
return self._kube_instance.get_logview_address()
def get_mars_endpoint(self):
return self._cupid_session.get_proxied_url(
self._kube_instance.id, CUPID_APP_NAME
)
def get_notebook_endpoint(self):
return self._cupid_session.get_proxied_url(
self._kube_instance.id, NOTEBOOK_NAME
)
def get_graphscope_endpoint(self):
return self._cupid_session.get_proxied_url(
self._kube_instance.id, GS_COORDINATOR_NAME
)
def get_req_session(self):
from ....rest import RestClient
if options.mars.use_common_proxy:
return RestClient(
self._odps.account, self._endpoint, self._odps.project, tag="MARS"
)
else:
return requests.Session()
def check_instance_status(self):
if self._kube_instance.is_terminated():
for task_name, task in (self._kube_instance.get_task_statuses()).items():
exc = None
if task.status == Instance.Task.TaskStatus.FAILED:
exc = errors.parse_instance_error(
self._kube_instance.get_task_result(task_name)
)
elif task.status != Instance.Task.TaskStatus.SUCCESS:
exc = errors.ODPSError(
"%s, status=%s" % (task_name, task.status.value)
)
if exc:
exc.instance_id = self._kube_instance.id
raise exc
def _post_pyodps_api(self, **data):
r = self._req_session.post(
self._endpoint.rstrip("/") + "/api/pyodps", data=data
)
try:
r.raise_for_status()
except errors.InvalidStateSetting:
if not self._kube_instance.is_successful():
raise
def wait_for_success(self, min_worker_num=0, create_session=True):
while True:
self.check_instance_status()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
try:
if self._endpoint is None:
self._endpoint = self.get_mars_endpoint()
logger.info("Mars UI: " + self._endpoint)
self._req_session = self.get_req_session()
self._post_pyodps_api(
action="write_log",
content="Mars UI from client: " + self._endpoint,
)
if self._with_notebook and self._notebook_endpoint is None:
self._notebook_endpoint = self.get_notebook_endpoint()
logger.info("Notebook UI: " + self._notebook_endpoint)
self._post_pyodps_api(
action="write_log",
content="Notebook UI from client: "
+ self._notebook_endpoint,
)
if self._with_graphscope and self._graphscope_endpoint is None:
self._graphscope_endpoint = self.get_graphscope_endpoint()
logger.info("Graphscope endpoint: " + self._graphscope_endpoint)
self._post_pyodps_api(
action="write_log",
content="Graphscope endpoint from client: "
+ self._graphscope_endpoint,
)
except KeyboardInterrupt:
raise
except:
time.sleep(1)
continue
try:
if not self.check_service_ready():
continue
if self.count_workers() >= min_worker_num:
break
except:
continue
finally:
time.sleep(1)
if create_session:
try:
self._mars_session = new_session(
self._endpoint, request_rewriter=self.rewrite_mars_request
).as_default()
except KeyboardInterrupt:
raise
except:
if (
self._kube_instance
and self._kube_instance.status == self._kube_instance.Status.RUNNING
):
self._kube_instance.stop()
raise
def restart_session(self):
self._mars_session.close()
self._mars_session = new_session(
self._endpoint, request_rewriter=self.rewrite_mars_request
).as_default()
def stop_server(self):
if not self._kube_instance:
return
try:
self._post_pyodps_api(action="terminate", message="Stopped at client side")
self._kube_instance.wait_for_completion(
timeout=options.mars.container_status_timeout
)
except BaseException:
if not self._kube_instance.is_terminated():
try:
self._kube_instance.stop()
except errors.InvalidStateSetting:
pass
finally:
self._kube_instance = None