core/maxframe/conftest.py (153 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import faulthandler import os from configparser import ConfigParser, NoOptionError, NoSectionError import pytest from odps import ODPS from odps.accounts import BearerTokenAccount from .config import options faulthandler.enable(all_threads=True) _test_conf_file_name = os.path.join( os.path.dirname(os.path.abspath(__file__)), "tests", "test.conf" ) @pytest.fixture(scope="session") def test_config(): config = ConfigParser() config.read(_test_conf_file_name) return config def _get_odps_env(test_config: ConfigParser, section_name: str) -> ODPS: try: access_id = test_config.get(section_name, "access_id") except NoOptionError: access_id = test_config.get("odps", "access_id") if not access_id: access_id = os.getenv("ACCESS_ID") try: secret_access_key = test_config.get(section_name, "secret_access_key") except NoOptionError: secret_access_key = test_config.get("odps", "secret_access_key") if not secret_access_key: secret_access_key = os.getenv("SECRET_ACCESS_KEY") try: project = test_config.get(section_name, "project") except NoOptionError: project = test_config.get("odps", "project") try: endpoint = test_config.get(section_name, "endpoint") except NoOptionError: endpoint = test_config.get("odps", "endpoint") try: tunnel_endpoint = test_config.get("odps", "tunnel_endpoint") except NoOptionError: tunnel_endpoint = None entry = ODPS( access_id, secret_access_key, project, endpoint, overwrite_global=False ) policy = { "Version": "1", "Statement": [ {"Action": ["odps:*"], "Resource": "acs:odps:*:*", "Effect": "Allow"} ], } token = entry.get_project().generate_auth_token(policy, "bearer", 5) return ODPS( account=BearerTokenAccount(token, 5), project=project, endpoint=endpoint, tunnel_endpoint=tunnel_endpoint, ) @pytest.fixture(scope="session") def odps_with_schema(test_config): try: return _get_odps_env(test_config, "odps_with_schema") except NoSectionError: pytest.skip("Need to specify odps_with_schema section in test.conf") @pytest.fixture(scope="session", autouse=True) def odps_envs(test_config): entry = _get_odps_env(test_config, "odps") os.environ["ODPS_BEARER_TOKEN"] = entry.account.token os.environ["ODPS_PROJECT_NAME"] = entry.project os.environ["ODPS_ENDPOINT"] = entry.endpoint if entry.tunnel_endpoint: os.environ["ODPS_TUNNEL_ENDPOINT"] = entry.tunnel_endpoint try: yield finally: os.environ.pop("ODPS_BEARER_TOKEN", None) os.environ.pop("ODPS_PROJECT_NAME", None) os.environ.pop("ODPS_ENDPOINT", None) os.environ.pop("ODPS_TUNNEL_ENDPOINT", None) from .tests.utils import _test_tables_to_drop for table_name in _test_tables_to_drop: try: entry.delete_table(table_name, wait=False) except: pass @pytest.fixture(scope="session") def oss_config(): config = ConfigParser() config.read(_test_conf_file_name) old_role_arn = options.service_role_arn old_cache_url = options.object_cache_url try: oss_access_id = config.get("oss", "access_id") or os.getenv("ACCESS_ID") oss_secret_access_key = config.get("oss", "secret_access_key") or os.getenv( "SECRET_ACCESS_KEY" ) oss_bucket_name = config.get("oss", "bucket_name") oss_endpoint = config.get("oss", "endpoint") oss_rolearn = config.get("oss", "rolearn") options.service_role_arn = oss_rolearn if "test" in oss_endpoint: oss_svc_endpoint = oss_endpoint else: endpoint_parts = oss_endpoint.split(".", 1) if "-internal" not in endpoint_parts[0]: endpoint_parts[0] += "-internal" oss_svc_endpoint = ".".join(endpoint_parts) options.object_cache_url = f"oss://{oss_svc_endpoint}/{oss_bucket_name}" config.oss_config = ( oss_access_id, oss_secret_access_key, oss_bucket_name, oss_endpoint, ) import oss2 auth = oss2.Auth(oss_access_id, oss_secret_access_key) config.oss_bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name) config.oss_rolearn = oss_rolearn yield config except (NoSectionError, NoOptionError, ImportError): return None finally: options.service_role_arn = old_role_arn options.object_cache_url = old_cache_url @pytest.fixture(autouse=True) def apply_engine_selection(request): try: from maxframe_framedriver.services.analyzer import DagAnalyzer except ImportError: DagAnalyzer = None try: marks = list(request.node.iter_markers()) if request.node.parent: marks.extend(request.node.parent.iter_markers()) marks = [m for m in marks if m.name == "maxframe_engine"] if DagAnalyzer: engines = set() for mark in marks: engines.update(mark.args[0]) DagAnalyzer._enabled_engines = set(engines) yield finally: if DagAnalyzer: DagAnalyzer._enabled_engines = set() @pytest.fixture def local_test_envs(): spe_launcher_env = "MAXFRAME_SPE_LAUNCHER" old_value = os.getenv(spe_launcher_env) os.environ[spe_launcher_env] = "local" yield if old_value is not None: os.environ[spe_launcher_env] = old_value else: del os.environ[spe_launcher_env]