in odps/mars_extension/oscar/cupid_service.py [0:0]
def _handle_create_table_download_session(sock):
try:
(cmd_len,) = struct.unpack("<I", sock.recv(4))
# dict with odps_params, table_name, partition, columns, worker_count, split_size, max_chunk_num
session_config = pickle.loads(sock.recv(cmd_len))
from odps import ODPS
from odps.errors import ODPSError
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.errors import CupidError
from cupid.runtime import RuntimeContext
if not RuntimeContext.is_context_ready():
raise SystemError(
"No Mars cluster found, please create via `o.create_mars_cluster`."
)
cupid_ctx = context()
odps_params = session_config["odps_params"]
bearer_token = cupid_ctx.get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"]
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(None, None, account=account, project=project, endpoint=endpoint)
cupid_session = CupidSession(o)
split_size = session_config["split_size"]
table_name = session_config["table_name"]
data_src = o.get_table(table_name)
if session_config.get("partition") is not None:
data_src = data_src.get_partition(session_config["partition"])
try:
data_store_size = data_src.size
except ODPSError:
# fail to get data size, just ignore
pass
else:
worker_count = session_config["worker_count"]
if data_store_size < split_size and worker_count is not None:
# data is too small, split as many as number of cores
split_size = data_store_size // worker_count
# at least 1M
split_size = max(split_size, 1 * 1024**2)
logger.debug(
"Input data size is too small, split_size is {}".format(split_size)
)
max_chunk_num = session_config["max_chunk_num"]
columns = session_config["columns"]
with_split_meta = session_config.get("with_split_meta_on_tile")
logger.debug(
"Start creating download session of table %s from cupid, columns %r",
table_name,
columns,
)
while True:
try:
download_session = cupid_session.create_download_session(
data_src,
split_size=split_size,
columns=columns,
with_split_meta=with_split_meta,
)
break
except CupidError:
logger.debug(
"The number of splits exceeds 100000, split_size is {}".format(
split_size
)
)
if split_size >= max_chunk_num:
raise
else:
split_size *= 2
ret_data = {
"splits": download_session.splits,
"split_size": split_size,
}
_write_request_result(sock, result=ret_data)
except:
logger.exception("Failed to create download session")
_write_request_result(sock, False, exc_info=sys.exc_info())