odps/models/session/core.py (120 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from ... import utils from ...compat import six from .v1 import McqaV1Methods from .v2 import McqaV2Methods def _redirect_v1(func): @six.wraps(func) def wrapped(*args, **kwargs): meth = getattr(McqaV1Methods, func.__name__) if args and isinstance(args[0], type): args = args[1:] return meth(*args, **kwargs) return wrapped class SessionMethods(object): @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) @_redirect_v1 def attach_session(cls, odps, session_name, taskname=None, hints=None): """ Attach to an existing session. :param session_name: The session name. :param taskname: The created sqlrt task name. If not provided, the default value is used. Mostly doesn't matter, default works. :return: A SessionInstance you may execute select tasks within. """ pass @classmethod @_redirect_v1 def _attach_mcqa_session(cls, odps, session_name=None, task_name=None, hints=None): pass @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) @_redirect_v1 def default_session(cls, odps): """ Attach to the default session of your project. :return: A SessionInstance you may execute select tasks within. """ pass @classmethod @_redirect_v1 def _get_default_mcqa_session( cls, odps, session_name=None, hints=None, wait=True, service_startup_timeout=60 ): pass @classmethod @utils.deprecated( "You no longer have to manipulate session instances to use MaxCompute QueryAcceleration. " "Try `run_sql_interactive`." ) @_redirect_v1 def create_session( cls, odps, session_worker_count, session_worker_memory, session_name=None, worker_spare_span=None, taskname=None, hints=None, ): """ Create session. :param session_worker_count: How much workers assigned to the session. :param session_worker_memory: How much memory each worker consumes. :param session_name: The session name. Not specifying to use its ID as name. :param worker_spare_span: format "00-24", allocated workers will be reduced during this time. Not specifying to disable this. :param taskname: The created sqlrt task name. If not provided, the default value is used. Mostly doesn't matter, default works. :param hints: Extra hints provided to the session. Parameters of this method will override certain hints. :return: A SessionInstance you may execute select tasks within. """ pass @classmethod @_redirect_v1 def _create_mcqa_session( cls, odps, session_worker_count, session_worker_memory, session_name=None, worker_spare_span=None, task_name=None, hints=None, ): pass @classmethod def _get_mcqa_v2_quota_name(cls, odps, hints=None, quota_name=None): hints = hints or {} return quota_name or hints.get("odps.task.wlm.quota") or odps.quota_name @classmethod def run_sql_interactive(cls, odps, sql, hints=None, use_mcqa_v2=False, **kwargs): """ Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). Won't fallback to offline mode automatically if query not supported or fails :param sql: the sql query. :param hints: settings for sql query. :return: instance. """ if use_mcqa_v2: quota_name = cls._get_mcqa_v2_quota_name( odps, hints=hints, quota_name=kwargs.pop("quota_name", None) ) return McqaV2Methods.run_sql_interactive( odps, sql, hints=hints, quota_name=quota_name, **kwargs ) return McqaV1Methods.run_sql_interactive(odps, sql, hints=hints, **kwargs) @classmethod @utils.deprecated( "The method `run_sql_interactive_with_fallback` is deprecated. " "Try `execute_sql_interactive` with fallback=True argument instead." ) def run_sql_interactive_with_fallback(cls, odps, sql, hints=None, **kwargs): return cls.execute_sql_interactive( odps, sql, hints=hints, fallback="all", wait_fallback=False, **kwargs ) @classmethod def execute_sql_interactive( cls, odps, sql, hints=None, fallback=True, wait_fallback=True, offline_quota_name=None, use_mcqa_v2=False, **kwargs ): """ Run SQL query in interactive mode (a.k.a MaxCompute QueryAcceleration). If query is not supported or fails, and fallback is True, will fallback to offline mode automatically :param sql: the sql query. :param hints: settings for sql query. :param fallback: fallback query to non-interactive mode, True by default. Both boolean type and policy names separated by commas are acceptable. :param bool wait_fallback: wait fallback instance to finish, True by default. :return: instance. """ if use_mcqa_v2: quota_name = cls._get_mcqa_v2_quota_name( odps, hints=hints, quota_name=kwargs.pop("quota_name", None) ) return McqaV2Methods.execute_sql_interactive( odps, sql, hints=hints, quota_name=quota_name, **kwargs ) return McqaV1Methods.execute_sql_interactive( odps, sql, hints=hints, fallback=fallback, wait_fallback=wait_fallback, offline_quota_name=offline_quota_name, **kwargs )