cupid/runtime/runtime.py (92 lines of code) (raw):
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import os
import warnings
from odps.compat import enum
from ..utils import get_environ
from ..rpc import CupidRpcController, SandboxRpcChannel
try:
from ..proto import cupid_process_service_pb2 as process_pb
except TypeError:
warnings.warn('Cannot import protos from pycupid: '
'consider upgrading your protobuf python package.', ImportWarning)
raise ImportError
logger = logging.getLogger(__name__)
_pid_to_context = dict()
def context():
try:
from .ctypes_libs import Subprocess_Container_Init, Subprocess_StartFDReceiver
except ImportError:
return
pid = os.getpid()
if pid not in _pid_to_context:
Subprocess_Container_Init()
Subprocess_StartFDReceiver()
_pid_to_context[pid] = RuntimeContext()
return _pid_to_context[pid]
class ContainerStatus(enum.Enum):
START = 1
TERMINATED = 2
RUNNING = 3
class WorkItemProgress(enum.Enum):
WIP_WAITING = 1 # waiting for enough resource
WIP_READY = 2 # resource ready, but not running yet
WIP_RUNNING = 3
WIP_TERMINATING = 4 # worker has tried to terminate the work item but is not yet confirmed
WIP_TERMINATED = 5 # work confirms that the work item is terminated normally
WIP_FAILED = 6 # worker confirms that the work item has failed
WIP_INTERRUPTED = 7
WIP_DEAD = 8
WIP_INTERRUPTING = 9
class RuntimeContext(object):
def __init__(self):
from .ctypes_libs import ChannelConf, ChannelSlaveClient
# init channels
server_read_pipe_num = int(get_environ('serverReadPipeNum', 2))
server_write_pipe_num = int(get_environ('serverWritePipeNum', 2))
client_read_pipe_num = int(get_environ('clientReadPipeNum', 2))
client_write_pipe_num = int(get_environ('clientWritePipeNum', 2))
cpu_cores = min(1, int(get_environ('workerCores', 2)))
channel_conf = ChannelConf()
channel_conf.set_integer('odps.subprocess.channel.rpc.handler.count',
max(cpu_cores / 2, 1))
channel_conf.set_integer('odps.subprocess.channel.stream.handler.count', cpu_cores * 3)
logger.info("serverReadPipeNum: %d, serverWritePipeNum: %d, clientReadPipeNum: %d, "
"clientWritePipeNum: %d, cpuCores: %d" %
(server_read_pipe_num, server_write_pipe_num, client_read_pipe_num,
client_write_pipe_num, cpu_cores))
self._channel_client = ChannelSlaveClient(client_write_pipe_num, client_read_pipe_num, 'slave_client')
self._channel_client.start()
@staticmethod
def is_context_ready():
return os.getpid() in _pid_to_context
@property
def channel_client(self):
return self._channel_client
def register_application(self, app_name, address):
return self.channel_client.sync_call('report_app_address', json.dumps(
dict(name=app_name, address=address)
))
def get_bearer_token(self):
return self.channel_client.sync_call('get_bearer_token', '')
def kv_store(self):
"""
Get key-value store of Cupid Service
:param session: cupid session
:return: kv-store instance
"""
from ..io.kvstore import CupidKVStore
if not hasattr(self, '_cupid_kv_store'):
self._cupid_kv_store = CupidKVStore()
return self._cupid_kv_store
@staticmethod
def prepare_channel():
controller = CupidRpcController()
channel = SandboxRpcChannel()
stub = process_pb.ProcessService_Stub(channel)
env = process_pb.EnvEntry()
resp = stub.Prepare(controller, env, None)
return resp.entries
def report_container_status(self, status, message, progress, timeout=-1):
params = json.dumps(dict(
status=str(status.value),
message=json.dumps(dict(
status=message,
progress=str(progress.value),
)),
))
self._channel_client.sync_call("report_container_status", params, timeout=timeout)