odps/mars_extension/oscar/cupid_service.py (594 lines of code) (raw):

# Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import logging import os import pickle import socket import struct import sys import time from concurrent.futures import ThreadPoolExecutor from mars.lib.tblib import pickling_support from ...compat import six from ..utils import filter_partitions, check_partition_exist pickling_support.install() logger = logging.getLogger(__name__) REQUEST_TYPE_READ_TABLE_DATA = 0 REQUEST_TYPE_WRITE_TABLE_DATA = 1 REQUEST_TYPE_ENUM_TABLE_PARTITIONS = 2 REQUEST_TYPE_CREATE_TABLE_DOWNLOAD_SESSION = 3 REQUEST_TYPE_CREATE_TABLE_UPLOAD_SESSION = 4 REQUEST_TYPE_COMMIT_TABLE_UPLOAD_SESSION = 5 REQUEST_TYPE_GET_KV = 6 REQUEST_TYPE_PUT_KV = 7 REQUEST_TYPE_TERMINATE_INSTANCE = 8 REQUEST_TYPE_GET_BEARER_TOKEN = 9 REQUEST_TYPE_REPORT_CONTAINER_STATUS = 10 CHUNK_BYTES_LIMIT = 64 * 1024**2 TRANSFER_BLOCK_SIZE = 64 * 1024**2 MAX_CHUNK_NUM = 512 * 1024**2 def _create_arrow_writer(sink, schema, **kwargs): import pyarrow as pa try: return pa.ipc.new_stream(sink, schema, **kwargs) except AttributeError: return pa.ipc.RecordBatchStreamWriter(sink, schema, **kwargs) def _write_request_result(sock, success=True, result=None, exc_info=None): try: result_dict = { "status": success, "result": result, "exc_info": exc_info, } pickled = pickle.dumps({k: v for k, v in result_dict.items()}) sock_out_file = sock.makefile("wb") sock_out_file.write(struct.pack("<I", len(pickled))) sock_out_file.write(pickled) finally: sock_out_file.flush() sock_out_file.close() def _handle_read_table_data(sock): from cupid.io.table import TableSplit from cupid.errors import SubprocessStreamEOFError sock_out_file = sock.makefile("wb") ipc_writer = None try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) read_config = pickle.loads(sock.recv(cmd_len)) min_rows = read_config.pop("min_rows", None) tsp = TableSplit(**read_config) logger.debug("Read split table, split index: %s", read_config["_split_index"]) read_config = None if min_rows is None: reader = tsp.open_arrow_file_reader() while True: chunk = reader.read(TRANSFER_BLOCK_SIZE) if len(chunk) == 0: break try: sock_out_file.write(chunk) except (BrokenPipeError, SubprocessStreamEOFError): break finally: chunk = None else: reader = tsp.open_arrow_reader() nrows = 0 while min_rows is None or nrows < min_rows: try: batch = reader.read_next_batch() nrows += batch.num_rows if ipc_writer is None: ipc_writer = _create_arrow_writer(sock_out_file, batch.schema) ipc_writer.write_batch(batch) except StopIteration: break finally: batch = None if ipc_writer is not None: # pragma: no branch ipc_writer.close() ipc_writer = None except: logger.exception("Failed to read table") finally: if ipc_writer is not None: ipc_writer.close() sock_out_file.flush() sock_out_file.close() def _handle_write_table_data(sock): import pyarrow as pa from cupid.io.table.core import BlockWriter try: sock_in_file = sock.makefile("rb") (cmd_len,) = struct.unpack("<I", sock.recv(4)) writer_config = pickle.loads(sock.recv(cmd_len)) block_writer = BlockWriter(**writer_config) logger.debug( "Start writing table block, block id: %s", writer_config["_block_id"] ) ipc_reader = pa.ipc.open_stream(sock_in_file) with block_writer.open_arrow_writer() as cupid_writer: arrow_writer = pa.RecordBatchStreamWriter(cupid_writer, ipc_reader.schema) while True: try: batch = ipc_reader.read_next_batch() arrow_writer.write_batch(batch) except StopIteration: break arrow_writer.close() logger.debug( "Write table block finished, block id: %s", writer_config["_block_id"] ) block_writer.commit() _write_request_result(sock, result={"block_id": writer_config["_block_id"]}) except: logger.exception("Failed to read table") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_enum_table_partitions(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name, partition task_config = pickle.loads(sock.recv(cmd_len)) from odps import ODPS from odps.accounts import BearerTokenAccount from cupid import context cupid_ctx = context() odps_params = task_config["odps_params"] bearer_token = cupid_ctx.get_bearer_token() account = BearerTokenAccount(bearer_token) project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"] endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"] o = ODPS(None, None, account=account, project=project, endpoint=endpoint) table = o.get_table(task_config["table_name"]) partition_desc = task_config.get("partition") if not table.table_schema.partitions: _write_request_result(sock, result=None) elif partition_desc: if check_partition_exist(table, partition_desc): _write_request_result(sock, result=[partition_desc]) else: parts = filter_partitions(o, list(table.partitions), partition_desc) _write_request_result( sock, result=[str(pt.partition_spec) for pt in parts] ) else: _write_request_result( sock, result=[str(pt.partition_spec) for pt in table.partitions] ) except: logger.exception("Failed to create download session") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_create_table_download_session(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name, partition, columns, worker_count, split_size, max_chunk_num session_config = pickle.loads(sock.recv(cmd_len)) from odps import ODPS from odps.errors import ODPSError from odps.accounts import BearerTokenAccount from cupid import CupidSession, context from cupid.errors import CupidError from cupid.runtime import RuntimeContext if not RuntimeContext.is_context_ready(): raise SystemError( "No Mars cluster found, please create via `o.create_mars_cluster`." ) cupid_ctx = context() odps_params = session_config["odps_params"] bearer_token = cupid_ctx.get_bearer_token() account = BearerTokenAccount(bearer_token) project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"] endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"] o = ODPS(None, None, account=account, project=project, endpoint=endpoint) cupid_session = CupidSession(o) split_size = session_config["split_size"] table_name = session_config["table_name"] data_src = o.get_table(table_name) if session_config.get("partition") is not None: data_src = data_src.get_partition(session_config["partition"]) try: data_store_size = data_src.size except ODPSError: # fail to get data size, just ignore pass else: worker_count = session_config["worker_count"] if data_store_size < split_size and worker_count is not None: # data is too small, split as many as number of cores split_size = data_store_size // worker_count # at least 1M split_size = max(split_size, 1 * 1024**2) logger.debug( "Input data size is too small, split_size is {}".format(split_size) ) max_chunk_num = session_config["max_chunk_num"] columns = session_config["columns"] with_split_meta = session_config.get("with_split_meta_on_tile") logger.debug( "Start creating download session of table %s from cupid, columns %r", table_name, columns, ) while True: try: download_session = cupid_session.create_download_session( data_src, split_size=split_size, columns=columns, with_split_meta=with_split_meta, ) break except CupidError: logger.debug( "The number of splits exceeds 100000, split_size is {}".format( split_size ) ) if split_size >= max_chunk_num: raise else: split_size *= 2 ret_data = { "splits": download_session.splits, "split_size": split_size, } _write_request_result(sock, result=ret_data) except: logger.exception("Failed to create download session") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_create_table_upload_session(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name session_config = pickle.loads(sock.recv(cmd_len)) from odps import ODPS from odps.accounts import BearerTokenAccount from cupid import CupidSession, context from cupid.runtime import RuntimeContext if not RuntimeContext.is_context_ready(): raise SystemError( "No Mars cluster found, please create via `o.create_mars_cluster`." ) cupid_ctx = context() odps_params = session_config["odps_params"] bearer_token = cupid_ctx.get_bearer_token() account = BearerTokenAccount(bearer_token) project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"] endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"] o = ODPS(None, None, account=account, project=project, endpoint=endpoint) cupid_session = CupidSession(o) data_src = o.get_table(session_config["table_name"]) logger.debug("Start creating upload session from cupid.") upload_session = cupid_session.create_upload_session(data_src) ret_data = { "handle": upload_session.handle, } _write_request_result(sock, result=ret_data) except: logger.exception("Failed to create upload session") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_commit_table_upload_session(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name, cupid_handle, blocks, overwrite commit_config = pickle.loads(sock.recv(cmd_len)) from odps import ODPS from odps.accounts import BearerTokenAccount from cupid import CupidSession, context from cupid.runtime import RuntimeContext from cupid.io.table import CupidTableUploadSession if not RuntimeContext.is_context_ready(): raise SystemError( "No Mars cluster found, please create via `o.create_mars_cluster`." ) cupid_ctx = context() odps_params = commit_config["odps_params"] bearer_token = cupid_ctx.get_bearer_token() account = BearerTokenAccount(bearer_token) project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"] endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"] o = ODPS(None, None, account=account, project=project, endpoint=endpoint) cupid_session = CupidSession(o) table = o.get_table(commit_config["table_name"]) upload_session = CupidTableUploadSession( session=cupid_session, table_name=table.name, project_name=table.project.name, handle=commit_config["cupid_handle"], blocks=commit_config["blocks"], ) upload_session.commit(overwrite=commit_config["overwrite"]) _write_request_result(sock) except: logger.exception("Failed to commit upload session") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_get_kv(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with key cmd_body = pickle.loads(sock.recv(cmd_len)) from cupid.runtime import RuntimeContext if not RuntimeContext.is_context_ready(): logger.warning("Cupid context not ready") value = None else: from cupid import context cupid_kv = context().kv_store() value = cupid_kv.get(cmd_body["key"]) ret_data = { "value": value, } _write_request_result(sock, result=ret_data) except: logger.exception("Failed to get kv value") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_put_kv(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with key cmd_body = pickle.loads(sock.recv(cmd_len)) from cupid.runtime import RuntimeContext if not RuntimeContext.is_context_ready(): logger.warning("Cupid context not ready") else: from cupid import context cupid_kv = context().kv_store() cupid_kv[cmd_body["key"]] = cmd_body["value"] _write_request_result(sock) except: logger.exception("Failed to put kv value") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_terminate_instance(sock): from cupid.runtime import context, RuntimeContext from odps import ODPS from odps.accounts import BearerTokenAccount try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with key cmd_body = pickle.loads(sock.recv(cmd_len)) instance_id = cmd_body["instance_id"] if not RuntimeContext.is_context_ready(): logger.warning("Cupid context not ready") else: bearer_token = context().get_bearer_token() account = BearerTokenAccount(bearer_token) project = os.environ["ODPS_PROJECT_NAME"] endpoint = os.environ["ODPS_RUNTIME_ENDPOINT"] o = ODPS(None, None, account=account, project=project, endpoint=endpoint) o.stop_instance(instance_id) except: logger.exception("Failed to put kv value") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_get_bearer_token(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name, cupid_handle, blocks, overwrite _commit_config = pickle.loads(sock.recv(cmd_len)) # noqa: F841 from cupid import context bearer_token = context().get_bearer_token() _write_request_result(sock, result={"token": bearer_token}) except: logger.exception("Failed to get bearer token") _write_request_result(sock, False, exc_info=sys.exc_info()) def _handle_report_container_status(sock): try: (cmd_len,) = struct.unpack("<I", sock.recv(4)) # dict with odps_params, table_name, cupid_handle, blocks, overwrite cmd_body = pickle.loads(sock.recv(cmd_len)) status = cmd_body["status"] message = cmd_body["message"] progress = cmd_body["progress"] timeout = cmd_body["timeout"] from cupid import context context().report_container_status(status, message, progress, timeout=timeout) except: logger.exception("Failed to get bearer token") _write_request_result(sock, False, exc_info=sys.exc_info()) _request_handlers = { REQUEST_TYPE_READ_TABLE_DATA: _handle_read_table_data, REQUEST_TYPE_WRITE_TABLE_DATA: _handle_write_table_data, REQUEST_TYPE_ENUM_TABLE_PARTITIONS: _handle_enum_table_partitions, REQUEST_TYPE_CREATE_TABLE_DOWNLOAD_SESSION: _handle_create_table_download_session, REQUEST_TYPE_CREATE_TABLE_UPLOAD_SESSION: _handle_create_table_upload_session, REQUEST_TYPE_COMMIT_TABLE_UPLOAD_SESSION: _handle_commit_table_upload_session, REQUEST_TYPE_GET_KV: _handle_get_kv, REQUEST_TYPE_PUT_KV: _handle_put_kv, REQUEST_TYPE_TERMINATE_INSTANCE: _handle_terminate_instance, REQUEST_TYPE_GET_BEARER_TOKEN: _handle_get_bearer_token, REQUEST_TYPE_REPORT_CONTAINER_STATUS: _handle_report_container_status, } def _handle_requests(sock): while True: try: (req_type,) = struct.unpack("<I", sock.recv(4)) if req_type in _request_handlers: _request_handlers[req_type](sock) else: sock.close() break except ConnectionAbortedError: break def _prepare_channel(channel_file): while not os.path.exists(channel_file): time.sleep(1) try: with open(channel_file, "r") as env_file: envs = json.loads(env_file.read()) except: time.sleep(1) with open(channel_file, "r") as env_file: envs = json.loads(env_file.read()) from cupid import context os.environ.update(envs) context() odps_envs = { "ODPS_BEARER_TOKEN": os.environ["BEARER_TOKEN_INITIAL_VALUE"], "ODPS_ENDPOINT": os.environ["ODPS_RUNTIME_ENDPOINT"], } os.environ.update(odps_envs) logger.info("Started channel for Cupid Server.") def run_cupid_service(channel_file, sock_file=None, pool_size=None): _prepare_channel(channel_file) pool = ThreadPoolExecutor(pool_size) sock_file = sock_file or os.environ["CUPID_SERVICE_SOCKET"] logger.warning("Starting Cupid Service with socket %s", sock_file) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(sock_file) sock.listen(1) while True: conn, _ = sock.accept() pool.submit(_handle_requests, conn) class CupidServiceClient: def __init__(self, sock_file=None): self._sock_file = sock_file or os.environ["CUPID_SERVICE_SOCKET"] self._sock = None def __del__(self): self.close() def close(self): if self._sock is not None: self._sock.close() self._sock = None @property def sock(self): if self._sock is None or self._sock._closed: if not os.path.exists(self._sock_file): raise OSError("Socket file does not exist") self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self._sock.connect(self._sock_file) return self._sock def _send_cupid_service_request(self, req_type, arg_object): ser = pickle.dumps(arg_object) sock_out_file = self.sock.makefile("wb") sock_out_file.write(struct.pack("<I", req_type)) sock_out_file.write(struct.pack("<I", len(ser))) sock_out_file.write(ser) sock_out_file.flush() def _fetch_cupid_service_result(self): (ret_len,) = struct.unpack("<I", self.sock.recv(4)) ret = pickle.loads(self.sock.recv(ret_len)) if not ret["status"]: six.reraise(*ret["exc_info"]) else: return ret.get("result") def read_table_data(self, split_config, min_rows=None): import pyarrow as pa split_config = split_config.copy() split_config["min_rows"] = min_rows self._send_cupid_service_request(REQUEST_TYPE_READ_TABLE_DATA, split_config) sock_in_file = self.sock.makefile("rb") batches = [] try: if min_rows is None: reader = pa.RecordBatchStreamReader(sock_in_file) return reader.read_all() else: ipc_reader = pa.ipc.open_stream(sock_in_file) while True: try: batches.append(ipc_reader.read_next_batch()) except StopIteration: break return pa.Table.from_batches(batches) finally: batches[:] = [] sock_in_file.close() def write_table_data(self, writer_config, to_store_data, write_batch_size=None): import pyarrow as pa from odps.mars_extension.utils import convert_pandas_object_to_string from odps.tunnel.io.types import odps_schema_to_arrow_schema writer_config = writer_config.copy() self._send_cupid_service_request(REQUEST_TYPE_WRITE_TABLE_DATA, writer_config) sock_out_file = self.sock.makefile("wb") batch_size = write_batch_size or 1024 batch_idx = 0 batch_data = to_store_data[ batch_size * batch_idx : batch_size * (batch_idx + 1) ] batch_data = convert_pandas_object_to_string(batch_data) schema = odps_schema_to_arrow_schema((writer_config["_table_schema"])) arrow_writer = _create_arrow_writer(sock_out_file, schema) while len(batch_data) > 0: batch = pa.RecordBatch.from_pandas( batch_data, schema=schema, preserve_index=False ) arrow_writer.write_batch(batch) batch_idx += 1 batch_data = to_store_data[ batch_size * batch_idx : batch_size * (batch_idx + 1) ] arrow_writer.close() sock_out_file.flush() return self._fetch_cupid_service_result() def enum_table_partitions(self, odps_params, table_name, partition=None): cmd_pack = { "odps_params": odps_params, "table_name": table_name, "partition": partition, } self._send_cupid_service_request(REQUEST_TYPE_ENUM_TABLE_PARTITIONS, cmd_pack) return self._fetch_cupid_service_result() def create_table_download_session( self, odps_params, table_name, partition, columns, worker_count=None, split_size=None, max_chunk_num=None, with_split_meta_on_tile=False, ): cmd_pack = { "odps_params": odps_params, "table_name": table_name, "partition": partition, "columns": columns, "worker_count": worker_count, "split_size": split_size or CHUNK_BYTES_LIMIT, "max_chunk_num": max_chunk_num or MAX_CHUNK_NUM, "with_split_meta_on_tile": with_split_meta_on_tile, } self._send_cupid_service_request( REQUEST_TYPE_CREATE_TABLE_DOWNLOAD_SESSION, cmd_pack ) result = self._fetch_cupid_service_result() return result["splits"], result["split_size"] def create_table_upload_session(self, odps_params, table_name): cmd_pack = { "odps_params": odps_params, "table_name": table_name, } self._send_cupid_service_request( REQUEST_TYPE_CREATE_TABLE_UPLOAD_SESSION, cmd_pack ) result = self._fetch_cupid_service_result() return result["handle"] def commit_table_upload_session( self, odps_params, table_name, cupid_handle, blocks, overwrite=True ): cmd_pack = { "odps_params": odps_params, "table_name": table_name, "cupid_handle": cupid_handle, "blocks": blocks, "overwrite": overwrite, } self._send_cupid_service_request( REQUEST_TYPE_COMMIT_TABLE_UPLOAD_SESSION, cmd_pack ) self._fetch_cupid_service_result() def get_kv(self, key): cmd_pack = {"key": key} self._send_cupid_service_request(REQUEST_TYPE_GET_KV, cmd_pack) result = self._fetch_cupid_service_result() return result["value"] def put_kv(self, key, value): cmd_pack = {"key": key, "value": value} self._send_cupid_service_request(REQUEST_TYPE_PUT_KV, cmd_pack) self._fetch_cupid_service_result() def terminate_instance(self, instance_id): cmd_pack = {"instance_id": instance_id} self._send_cupid_service_request(REQUEST_TYPE_TERMINATE_INSTANCE, cmd_pack) self._fetch_cupid_service_result() def get_bearer_token(self): self._send_cupid_service_request(REQUEST_TYPE_GET_BEARER_TOKEN, {}) result = self._fetch_cupid_service_result() return result["token"] def report_container_status(self, status, message, progress, timeout=-1): self._send_cupid_service_request( REQUEST_TYPE_REPORT_CONTAINER_STATUS, { "status": status, "message": message, "progress": progress, "timeout": timeout, }, ) self._fetch_cupid_service_result()