def _handle_create_table_download_session()

in odps/mars_extension/oscar/cupid_service.py [0:0]


def _handle_create_table_download_session(sock):
    try:
        (cmd_len,) = struct.unpack("<I", sock.recv(4))
        # dict with odps_params, table_name, partition, columns, worker_count, split_size, max_chunk_num
        session_config = pickle.loads(sock.recv(cmd_len))

        from odps import ODPS
        from odps.errors import ODPSError
        from odps.accounts import BearerTokenAccount
        from cupid import CupidSession, context
        from cupid.errors import CupidError
        from cupid.runtime import RuntimeContext

        if not RuntimeContext.is_context_ready():
            raise SystemError(
                "No Mars cluster found, please create via `o.create_mars_cluster`."
            )

        cupid_ctx = context()

        odps_params = session_config["odps_params"]
        bearer_token = cupid_ctx.get_bearer_token()
        account = BearerTokenAccount(bearer_token)
        project = os.environ.get("ODPS_PROJECT_NAME", None) or odps_params["project"]
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(None, None, account=account, project=project, endpoint=endpoint)
        cupid_session = CupidSession(o)

        split_size = session_config["split_size"]
        table_name = session_config["table_name"]
        data_src = o.get_table(table_name)
        if session_config.get("partition") is not None:
            data_src = data_src.get_partition(session_config["partition"])

        try:
            data_store_size = data_src.size
        except ODPSError:
            # fail to get data size, just ignore
            pass
        else:
            worker_count = session_config["worker_count"]
            if data_store_size < split_size and worker_count is not None:
                # data is too small, split as many as number of cores
                split_size = data_store_size // worker_count
                # at least 1M
                split_size = max(split_size, 1 * 1024**2)
                logger.debug(
                    "Input data size is too small, split_size is {}".format(split_size)
                )

        max_chunk_num = session_config["max_chunk_num"]
        columns = session_config["columns"]
        with_split_meta = session_config.get("with_split_meta_on_tile")

        logger.debug(
            "Start creating download session of table %s from cupid, columns %r",
            table_name,
            columns,
        )
        while True:
            try:
                download_session = cupid_session.create_download_session(
                    data_src,
                    split_size=split_size,
                    columns=columns,
                    with_split_meta=with_split_meta,
                )
                break
            except CupidError:
                logger.debug(
                    "The number of splits exceeds 100000, split_size is {}".format(
                        split_size
                    )
                )
                if split_size >= max_chunk_num:
                    raise
                else:
                    split_size *= 2

        ret_data = {
            "splits": download_session.splits,
            "split_size": split_size,
        }
        _write_request_result(sock, result=ret_data)
    except:
        logger.exception("Failed to create download session")
        _write_request_result(sock, False, exc_info=sys.exc_info())