dev/breeze/src/airflow_breeze/utils/selective_checks.py (1,240 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import difflib import itertools import json import os import re import sys from collections import defaultdict from enum import Enum from functools import cached_property from pathlib import Path from typing import Any, TypeVar from airflow_breeze.branch_defaults import AIRFLOW_BRANCH, DEFAULT_AIRFLOW_CONSTRAINTS_BRANCH from airflow_breeze.global_constants import ( ALL_PYTHON_MAJOR_MINOR_VERSIONS, APACHE_AIRFLOW_GITHUB_REPOSITORY, COMMITTERS, CURRENT_KUBERNETES_VERSIONS, CURRENT_MYSQL_VERSIONS, CURRENT_POSTGRES_VERSIONS, CURRENT_PYTHON_MAJOR_MINOR_VERSIONS, DEFAULT_KUBERNETES_VERSION, DEFAULT_MYSQL_VERSION, DEFAULT_POSTGRES_VERSION, DEFAULT_PYTHON_MAJOR_MINOR_VERSION, DISABLE_TESTABLE_INTEGRATIONS_FROM_CI, HELM_VERSION, KIND_VERSION, NUMBER_OF_LOW_DEP_SLICES, PROVIDERS_COMPATIBILITY_TESTS_MATRIX, PUBLIC_AMD_RUNNERS, PUBLIC_ARM_RUNNERS, TESTABLE_CORE_INTEGRATIONS, TESTABLE_PROVIDERS_INTEGRATIONS, GithubEvents, SelectiveAirflowCtlTestType, SelectiveCoreTestType, SelectiveProvidersTestType, SelectiveTaskSdkTestType, all_helm_test_packages, all_selective_core_test_types, providers_test_type, ) from airflow_breeze.utils.console import get_console from airflow_breeze.utils.exclude_from_matrix import excluded_combos from airflow_breeze.utils.functools_cache import clearable_cache from airflow_breeze.utils.kubernetes_utils import get_kubernetes_python_combos from airflow_breeze.utils.packages import get_available_distributions from airflow_breeze.utils.path_utils import ( AIRFLOW_DEVEL_COMMON_PATH, AIRFLOW_PROVIDERS_ROOT_PATH, AIRFLOW_ROOT_PATH, ) from airflow_breeze.utils.provider_dependencies import DEPENDENCIES, get_related_providers from airflow_breeze.utils.run_utils import run_command ALL_VERSIONS_LABEL = "all versions" CANARY_LABEL = "canary" DEBUG_CI_RESOURCES_LABEL = "debug ci resources" DEFAULT_VERSIONS_ONLY_LABEL = "default versions only" DISABLE_IMAGE_CACHE_LABEL = "disable image cache" FORCE_PIP_LABEL = "force pip" FULL_TESTS_NEEDED_LABEL = "full tests needed" INCLUDE_SUCCESS_OUTPUTS_LABEL = "include success outputs" LATEST_VERSIONS_ONLY_LABEL = "latest versions only" LOG_WITHOUT_MOCK_IN_TESTS_EXCEPTION_LABEL = "log exception" NON_COMMITTER_BUILD_LABEL = "non committer build" UPGRADE_TO_NEWER_DEPENDENCIES_LABEL = "upgrade to newer dependencies" USE_PUBLIC_RUNNERS_LABEL = "use public runners" ALL_CI_SELECTIVE_TEST_TYPES = "API Always CLI Core Other Serialization" ALL_PROVIDERS_SELECTIVE_TEST_TYPES = ( "Providers[-amazon,google,standard] Providers[amazon] Providers[google] Providers[standard]" ) class FileGroupForCi(Enum): ENVIRONMENT_FILES = "environment_files" PYTHON_PRODUCTION_FILES = "python_scans" JAVASCRIPT_PRODUCTION_FILES = "javascript_scans" ALWAYS_TESTS_FILES = "always_test_files" API_FILES = "api_files" GIT_PROVIDER_FILES = "git_provider_files" API_CODEGEN_FILES = "api_codegen_files" HELM_FILES = "helm_files" DEPENDENCY_FILES = "dependency_files" DOC_FILES = "doc_files" UI_FILES = "ui_files" SYSTEM_TEST_FILES = "system_tests" KUBERNETES_FILES = "kubernetes_files" TASK_SDK_FILES = "task_sdk_files" AIRFLOW_CTL_FILES = "airflow_ctl_files" ALL_PYTHON_FILES = "all_python_files" ALL_SOURCE_FILES = "all_sources_for_tests" ALL_AIRFLOW_PYTHON_FILES = "all_airflow_python_files" ALL_AIRFLOW_CTL_PYTHON_FILES = "all_airflow_ctl_python_files" ALL_PROVIDERS_PYTHON_FILES = "all_provider_python_files" ALL_DEV_PYTHON_FILES = "all_dev_python_files" ALL_DEVEL_COMMON_PYTHON_FILES = "all_devel_common_python_files" ALL_PROVIDER_YAML_FILES = "all_provider_yaml_files" TESTS_UTILS_FILES = "test_utils_files" ASSET_FILES = "asset_files" UNIT_TEST_FILES = "unit_test_files" class AllProvidersSentinel: pass ALL_PROVIDERS_SENTINEL = AllProvidersSentinel() T = TypeVar("T", FileGroupForCi, SelectiveCoreTestType) class HashableDict(dict[T, list[str]]): def __hash__(self): return hash(frozenset(self)) CI_FILE_GROUP_MATCHES = HashableDict( { FileGroupForCi.ENVIRONMENT_FILES: [ r"^.github/workflows", r"^dev/breeze", r"^dev/.*\.py$", r"^Dockerfile", r"^scripts/ci/docker-compose", r"^scripts/ci/kubernetes", r"^scripts/docker", r"^scripts/in_container", r"^generated/provider_dependencies.json$", ], FileGroupForCi.PYTHON_PRODUCTION_FILES: [ r"^airflow-core/src/airflow/.*\.py", r"^providers/.*\.py", r"^pyproject.toml", r"^hatch_build.py", ], FileGroupForCi.JAVASCRIPT_PRODUCTION_FILES: [ r"^airflow-core/src/airflow/.*\.[jt]sx?", r"^airflow-core/src/airflow/.*\.lock", r"^airflow-core/src/airflow/ui/.*\.yaml$", r"^airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/.*\.yaml$", ], FileGroupForCi.API_FILES: [ r"^airflow-core/src/airflow/api/", r"^airflow-core/src/airflow/api_fastapi/", r"^airflow-core/tests/unit/api/", r"^airflow-core/tests/unit/api_fastapi/", ], FileGroupForCi.GIT_PROVIDER_FILES: [ r"^providers/git/src/", ], FileGroupForCi.API_CODEGEN_FILES: [ r"^airflow-core/src/airflow/api_fastapi/core_api/openapi/.*generated\.yaml", r"^clients/gen", ], FileGroupForCi.HELM_FILES: [ r"^chart", r"^airflow-core/src/airflow/kubernetes", r"^airflow-core/tests/unit/kubernetes", r"^helm-tests", ], FileGroupForCi.DEPENDENCY_FILES: [ r"^generated/provider_dependencies.json$", ], FileGroupForCi.DOC_FILES: [ r"^docs", r"^devel-common/src/docs", r"^\.github/SECURITY\.md", r"^airflow-core/src/.*\.py$", r"^airflow-core/docs/", r"^providers/.*/src/", r"^providers/.*/docs/", r"^providers-summary-docs", r"^docker-stack-docs", r"^chart", r"^task-sdk/src/", r"^airflow-ctl/src/", r"^airflow-core/tests/system", r"^airflow-ctl/src", r"^airflow-ctl/docs", r"^CHANGELOG\.txt", r"^airflow-core/src/airflow/config_templates/config\.yml", r"^chart/RELEASE_NOTES\.rst", r"^chart/values\.schema\.json", r"^chart/values\.json", r"^RELEASE_NOTES\.rst", ], FileGroupForCi.UI_FILES: [ r"^airflow-core/src/airflow/ui/", r"^airflow-core/src/airflow/api_fastapi/auth/managers/simple/ui/", ], FileGroupForCi.KUBERNETES_FILES: [ r"^chart", r"^kubernetes-tests", r"^providers/cncf/kubernetes/", ], FileGroupForCi.ALL_PYTHON_FILES: [ r".*\.py$", ], FileGroupForCi.ALL_AIRFLOW_PYTHON_FILES: [ r"^airflow-core/.*\.py$", ], FileGroupForCi.ALL_AIRFLOW_CTL_PYTHON_FILES: [ r"^airflow-ctl/.*\.py$", ], FileGroupForCi.ALL_PROVIDERS_PYTHON_FILES: [ r"^providers/.*\.py$", ], FileGroupForCi.ALL_DEV_PYTHON_FILES: [ r"^dev/.*\.py$", ], FileGroupForCi.ALL_DEVEL_COMMON_PYTHON_FILES: [ r"^devel-common/.*\.py$", ], FileGroupForCi.ALL_SOURCE_FILES: [ r"^.pre-commit-config.yaml$", r"^airflow-core/.*", r"^airflow-ctl/.*", r"^chart/.*", r"^providers/.*", r"^task-sdk/.*", r"^devel-common/.*", r"^kubernetes-tests/.*", r"^docker-tests/.*", r"^dev/.*", ], FileGroupForCi.SYSTEM_TEST_FILES: [ r"^airflow-core/tests/system/", ], FileGroupForCi.ALWAYS_TESTS_FILES: [ r"^airflow-core/tests/unit/always/", ], FileGroupForCi.ALL_PROVIDER_YAML_FILES: [ r".*/provider\.yaml$", ], FileGroupForCi.TESTS_UTILS_FILES: [ r"^airflow-core/tests/unit/utils/", r"^devel-common/.*\.py$", ], FileGroupForCi.TASK_SDK_FILES: [ r"^task-sdk/src/airflow/sdk/.*\.py$", r"^task-sdk/tests/.*\.py$", ], FileGroupForCi.ASSET_FILES: [ r"^airflow-core/src/airflow/assets/", r"^airflow-core/src/airflow/models/assets/", r"^airflow-core/src/airflow/datasets/", r"^task-sdk/src/airflow/sdk/definitions/asset/", ], FileGroupForCi.UNIT_TEST_FILES: [ r"^airflow-core/tests/unit/", r"^task-sdk/tests/", r"^providers/.*/tests/unit/", r"^dev/breeze/tests/", r"^airflow-ctl/tests/", ], FileGroupForCi.AIRFLOW_CTL_FILES: [ r"^airflow-ctl/src/airflowctl/.*\.py$", r"^airflow-ctl/tests/.*\.py$", ], } ) PYTHON_OPERATOR_FILES = [ r"^providers/tests/standard/operators/test_python.py", ] TEST_TYPE_MATCHES = HashableDict( { SelectiveCoreTestType.API: [ r"^airflow-core/src/airflow/api/", r"^airflow-core/src/airflow/api_fastapi/", r"^airflow-core/tests/unit/api/", r"^airflow-core/tests/unit/api_fastapi/", ], SelectiveCoreTestType.CLI: [ r"^airflow-core/src/airflow/cli/", r"^airflow-core/tests/unit/cli/", ], SelectiveProvidersTestType.PROVIDERS: [ r"^providers/.*/src/airflow/providers/", r"^providers/.*/tests/", ], SelectiveTaskSdkTestType.TASK_SDK: [ r"^task-sdk/src/", r"^task-sdk/tests/", ], SelectiveCoreTestType.SERIALIZATION: [ r"^airflow-core/src/airflow/serialization/", r"^airflow-core/tests/unit/serialization/", ], SelectiveAirflowCtlTestType.AIRFLOW_CTL: [ r"^airflow-ctl/src/", r"^airflow-ctl/tests/", ], } ) def find_provider_affected(changed_file: str, include_docs: bool) -> str | None: file_path = AIRFLOW_ROOT_PATH / changed_file if not include_docs: for parent_dir_path in file_path.parents: if parent_dir_path.name == "docs" and (parent_dir_path.parent / "provider.yaml").exists(): # Skip Docs changes if include_docs is not set return None # Find if the path under src/system tests/tests belongs to provider or is a common code across # multiple providers for parent_dir_path in file_path.parents: if parent_dir_path == AIRFLOW_PROVIDERS_ROOT_PATH: # We have not found any provider specific path up to the root of the provider base folder break if parent_dir_path.is_relative_to(AIRFLOW_PROVIDERS_ROOT_PATH): relative_path = parent_dir_path.relative_to(AIRFLOW_PROVIDERS_ROOT_PATH) # check if this path belongs to a specific provider if (parent_dir_path / "provider.yaml").exists(): # new providers structure return str(relative_path).replace(os.sep, ".") if file_path.is_relative_to(AIRFLOW_DEVEL_COMMON_PATH): # if devel-common changes, we want to run tests for all providers, as they might start failing return "Providers" return None def _match_files_with_regexps(files: tuple[str, ...], matched_files, matching_regexps): for file in files: if any(re.match(regexp, file) for regexp in matching_regexps): matched_files.append(file) def _exclude_files_with_regexps(files: tuple[str, ...], matched_files, exclude_regexps): for file in files: if any(re.match(regexp, file) for regexp in exclude_regexps): if file in matched_files: matched_files.remove(file) @clearable_cache def _matching_files( files: tuple[str, ...], match_group: FileGroupForCi, match_dict: HashableDict ) -> list[str]: matched_files: list[str] = [] match_regexps = match_dict[match_group] _match_files_with_regexps(files, matched_files, match_regexps) count = len(matched_files) if count > 0: get_console().print(f"[warning]{match_group} matched {count} files.[/]") get_console().print(matched_files) else: get_console().print(f"[warning]{match_group} did not match any file.[/]") return matched_files # TODO: In Python 3.12 we will be able to use itertools.batched def _split_list(input_list, n) -> list[list[str]]: """Splits input_list into n sub-lists.""" it = iter(input_list) return [ list(itertools.islice(it, i)) for i in [len(input_list) // n + (1 if x < len(input_list) % n else 0) for x in range(n)] ] def _get_test_type_description(provider_test_types: list[str]) -> str: if not provider_test_types: return "" first_provider = provider_test_types[0] last_provider = provider_test_types[-1] if first_provider.startswith("Providers["): first_provider = first_provider.replace("Providers[", "").replace("]", "") if last_provider.startswith("Providers["): last_provider = last_provider.replace("Providers[", "").replace("]", "") return ( f"{first_provider[:13]}...{last_provider[:13]}" if first_provider != last_provider else (first_provider[:29]) ) def _get_test_list_as_json(list_of_list_of_types: list[list[str]]) -> list[dict[str, str]] | None: if len(list_of_list_of_types) == 1 and len(list_of_list_of_types[0]) == 0: return None return [ {"description": _get_test_type_description(list_of_types), "test_types": " ".join(list_of_types)} for list_of_types in list_of_list_of_types ] class SelectiveChecks: __HASHABLE_FIELDS = {"_files", "_default_branch", "_commit_ref", "_pr_labels", "_github_event"} def __init__( self, files: tuple[str, ...] = (), default_branch=AIRFLOW_BRANCH, default_constraints_branch=DEFAULT_AIRFLOW_CONSTRAINTS_BRANCH, commit_ref: str | None = None, pr_labels: tuple[str, ...] = (), github_event: GithubEvents = GithubEvents.PULL_REQUEST, github_repository: str = APACHE_AIRFLOW_GITHUB_REPOSITORY, github_actor: str = "", github_context_dict: dict[str, Any] | None = None, ): self._files = files self._default_branch = default_branch self._default_constraints_branch = default_constraints_branch self._commit_ref = commit_ref self._pr_labels = pr_labels self._github_event = github_event self._github_repository = github_repository self._github_actor = github_actor self._github_context_dict = github_context_dict or {} self._new_toml: dict[str, Any] = {} self._old_toml: dict[str, Any] = {} def __important_attributes(self) -> tuple[Any, ...]: return tuple(getattr(self, f) for f in self.__HASHABLE_FIELDS) def __hash__(self): return hash(self.__important_attributes()) def __eq__(self, other): return isinstance(other, SelectiveChecks) and all( [getattr(other, f) == getattr(self, f) for f in self.__HASHABLE_FIELDS] ) def __str__(self) -> str: from airflow_breeze.utils.github import get_ga_output output = [] for field_name in dir(self): if not field_name.startswith("_"): value = getattr(self, field_name) if value is not None: output.append(get_ga_output(field_name, value)) return "\n".join(output) default_postgres_version = DEFAULT_POSTGRES_VERSION default_mysql_version = DEFAULT_MYSQL_VERSION default_kubernetes_version = DEFAULT_KUBERNETES_VERSION default_kind_version = KIND_VERSION default_helm_version = HELM_VERSION @cached_property def latest_versions_only(self) -> bool: return LATEST_VERSIONS_ONLY_LABEL in self._pr_labels @cached_property def default_python_version(self) -> str: return ( CURRENT_PYTHON_MAJOR_MINOR_VERSIONS[-1] if LATEST_VERSIONS_ONLY_LABEL in self._pr_labels else DEFAULT_PYTHON_MAJOR_MINOR_VERSION ) @cached_property def default_branch(self) -> str: return self._default_branch @cached_property def default_constraints_branch(self) -> str: return self._default_constraints_branch def _should_run_all_tests_and_versions(self) -> bool: if self._github_event in [GithubEvents.PUSH, GithubEvents.SCHEDULE, GithubEvents.WORKFLOW_DISPATCH]: get_console().print(f"[warning]Running everything because event is {self._github_event}[/]") return True if not self._commit_ref: get_console().print("[warning]Running everything in all versions as commit is missing[/]") return True if self.hatch_build_changed: get_console().print("[warning]Running everything with all versions: hatch_build.py changed[/]") return True if self.pyproject_toml_changed and self.build_system_changed_in_pyproject_toml: get_console().print( "[warning]Running everything with all versions: build-system changed in pyproject.toml[/]" ) return True if self.generated_dependencies_changed: get_console().print( "[warning]Running everything with all versions: provider dependencies changed[/]" ) return True return False @cached_property def all_versions(self) -> bool: if DEFAULT_VERSIONS_ONLY_LABEL in self._pr_labels: return False if LATEST_VERSIONS_ONLY_LABEL in self._pr_labels: return False if ALL_VERSIONS_LABEL in self._pr_labels: return True if self._should_run_all_tests_and_versions(): return True return False @cached_property def full_tests_needed(self) -> bool: if self._should_run_all_tests_and_versions(): return True if self._matching_files( FileGroupForCi.ENVIRONMENT_FILES, CI_FILE_GROUP_MATCHES, ): get_console().print("[warning]Running full set of tests because env files changed[/]") return True if self._matching_files( FileGroupForCi.API_FILES, CI_FILE_GROUP_MATCHES, ): get_console().print("[warning]Running full set of tests because api files changed[/]") return True if self._matching_files( FileGroupForCi.GIT_PROVIDER_FILES, CI_FILE_GROUP_MATCHES, ): # TODO(potiuk): remove me when we get rid of the dependency get_console().print( "[warning]Running full set of tests because git provider files changed " "and for now we have core tests depending on them.[/]" ) return True if self._matching_files( FileGroupForCi.TESTS_UTILS_FILES, CI_FILE_GROUP_MATCHES, ): get_console().print("[warning]Running full set of tests because tests/utils changed[/]") return True if FULL_TESTS_NEEDED_LABEL in self._pr_labels: get_console().print( "[warning]Full tests needed because " f"label '{FULL_TESTS_NEEDED_LABEL}' is in {self._pr_labels}[/]" ) return True return False @cached_property def python_versions(self) -> list[str]: if self.all_versions: return CURRENT_PYTHON_MAJOR_MINOR_VERSIONS if self.latest_versions_only: return [CURRENT_PYTHON_MAJOR_MINOR_VERSIONS[-1]] return [DEFAULT_PYTHON_MAJOR_MINOR_VERSION] @cached_property def python_versions_list_as_string(self) -> str: return " ".join(self.python_versions) @cached_property def all_python_versions(self) -> list[str]: """ All python versions include all past python versions available in previous branches Even if we remove them from the main version. This is needed to make sure we can cherry-pick changes from main to the previous branch. """ if self.all_versions: return ALL_PYTHON_MAJOR_MINOR_VERSIONS if self.latest_versions_only: return [CURRENT_PYTHON_MAJOR_MINOR_VERSIONS[-1]] return [DEFAULT_PYTHON_MAJOR_MINOR_VERSION] @cached_property def all_python_versions_list_as_string(self) -> str: return " ".join(self.all_python_versions) @cached_property def postgres_versions(self) -> list[str]: if self.all_versions: return CURRENT_POSTGRES_VERSIONS if self.latest_versions_only: return [CURRENT_POSTGRES_VERSIONS[-1]] return [DEFAULT_POSTGRES_VERSION] @cached_property def mysql_versions(self) -> list[str]: if self.all_versions: return CURRENT_MYSQL_VERSIONS if self.latest_versions_only: return [CURRENT_MYSQL_VERSIONS[-1]] return [DEFAULT_MYSQL_VERSION] @cached_property def kind_version(self) -> str: return KIND_VERSION @cached_property def helm_version(self) -> str: return HELM_VERSION @cached_property def postgres_exclude(self) -> list[dict[str, str]]: if not self.all_versions: # Only basic combination so we do not need to exclude anything return [] return [ # Exclude all combinations that are repeating python/postgres versions {"python-version": python_version, "backend-version": postgres_version} for python_version, postgres_version in excluded_combos( CURRENT_PYTHON_MAJOR_MINOR_VERSIONS, CURRENT_POSTGRES_VERSIONS ) ] @cached_property def mysql_exclude(self) -> list[dict[str, str]]: if not self.all_versions: # Only basic combination so we do not need to exclude anything return [] return [ # Exclude all combinations that are repeating python/mysql versions {"python-version": python_version, "backend-version": mysql_version} for python_version, mysql_version in excluded_combos( CURRENT_PYTHON_MAJOR_MINOR_VERSIONS, CURRENT_MYSQL_VERSIONS ) ] @cached_property def sqlite_exclude(self) -> list[dict[str, str]]: return [] @cached_property def kubernetes_versions(self) -> list[str]: if self.all_versions: return CURRENT_KUBERNETES_VERSIONS if self.latest_versions_only: return [CURRENT_KUBERNETES_VERSIONS[-1]] return [DEFAULT_KUBERNETES_VERSION] @cached_property def kubernetes_versions_list_as_string(self) -> str: return " ".join(self.kubernetes_versions) @cached_property def kubernetes_combos(self) -> list[str]: python_version_array: list[str] = self.python_versions_list_as_string.split(" ") kubernetes_version_array: list[str] = self.kubernetes_versions_list_as_string.split(" ") combo_titles, short_combo_titles, combos = get_kubernetes_python_combos( kubernetes_version_array, python_version_array ) return short_combo_titles @cached_property def kubernetes_combos_list_as_string(self) -> str: return " ".join(self.kubernetes_combos) def _matching_files(self, match_group: FileGroupForCi, match_dict: HashableDict) -> list[str]: return _matching_files(self._files, match_group, match_dict) def _should_be_run(self, source_area: FileGroupForCi) -> bool: if self.full_tests_needed: get_console().print(f"[warning]{source_area} enabled because we are running everything[/]") return True matched_files = self._matching_files(source_area, CI_FILE_GROUP_MATCHES) if matched_files: get_console().print( f"[warning]{source_area} enabled because it matched {len(matched_files)} changed files[/]" ) return True get_console().print(f"[warning]{source_area} disabled because it did not match any changed files[/]") return False @cached_property def mypy_checks(self) -> list[str]: checks_to_run: list[str] = [] if ( self._matching_files(FileGroupForCi.ALL_AIRFLOW_PYTHON_FILES, CI_FILE_GROUP_MATCHES) or self.full_tests_needed ): checks_to_run.append("mypy-airflow-core") if ( self._matching_files(FileGroupForCi.ALL_PROVIDERS_PYTHON_FILES, CI_FILE_GROUP_MATCHES) or self._are_all_providers_affected() ) and self._default_branch == "main": checks_to_run.append("mypy-providers") if ( self._matching_files(FileGroupForCi.ALL_DEV_PYTHON_FILES, CI_FILE_GROUP_MATCHES) or self.full_tests_needed ): checks_to_run.append("mypy-dev") if ( self._matching_files(FileGroupForCi.TASK_SDK_FILES, CI_FILE_GROUP_MATCHES) or self.full_tests_needed ): checks_to_run.append("mypy-task-sdk") if ( self._matching_files(FileGroupForCi.ALL_DEVEL_COMMON_PYTHON_FILES, CI_FILE_GROUP_MATCHES) or self.full_tests_needed ): checks_to_run.append("mypy-devel-common") if ( self._matching_files(FileGroupForCi.ALL_AIRFLOW_CTL_PYTHON_FILES, CI_FILE_GROUP_MATCHES) or self.full_tests_needed ): checks_to_run.append("mypy-airflow-ctl") return checks_to_run @cached_property def needs_mypy(self) -> bool: return self.mypy_checks != [] @cached_property def needs_python_scans(self) -> bool: return self._should_be_run(FileGroupForCi.PYTHON_PRODUCTION_FILES) @cached_property def needs_javascript_scans(self) -> bool: return self._should_be_run(FileGroupForCi.JAVASCRIPT_PRODUCTION_FILES) @cached_property def needs_api_tests(self) -> bool: return self._should_be_run(FileGroupForCi.API_FILES) @cached_property def needs_ol_tests(self) -> bool: return self._should_be_run(FileGroupForCi.ASSET_FILES) @cached_property def needs_api_codegen(self) -> bool: return self._should_be_run(FileGroupForCi.API_CODEGEN_FILES) @cached_property def run_ui_tests(self) -> bool: return self._should_be_run(FileGroupForCi.UI_FILES) @cached_property def run_amazon_tests(self) -> bool: if self.providers_test_types_list_as_strings_in_json == "[]": return False return ( "amazon" in self.providers_test_types_list_as_strings_in_json or "Providers" in self.providers_test_types_list_as_strings_in_json.split(" ") ) @cached_property def run_task_sdk_tests(self) -> bool: return self._should_be_run(FileGroupForCi.TASK_SDK_FILES) @cached_property def run_airflow_ctl_tests(self) -> bool: return self._should_be_run(FileGroupForCi.AIRFLOW_CTL_FILES) @cached_property def run_kubernetes_tests(self) -> bool: return self._should_be_run(FileGroupForCi.KUBERNETES_FILES) @cached_property def docs_build(self) -> bool: return self._should_be_run(FileGroupForCi.DOC_FILES) @cached_property def needs_helm_tests(self) -> bool: return self._should_be_run(FileGroupForCi.HELM_FILES) and self._default_branch == "main" @cached_property def run_tests(self) -> bool: if self.full_tests_needed: return True if self._is_canary_run(): return True if self.only_new_ui_files: return False # we should run all test return self._should_be_run(FileGroupForCi.ALL_SOURCE_FILES) @cached_property def run_system_tests(self) -> bool: return self.run_tests @cached_property def only_pyproject_toml_files_changed(self) -> bool: return all(Path(file).name == "pyproject.toml" for file in self._files) @cached_property def ci_image_build(self) -> bool: # in case pyproject.toml changed, CI image should be built - even if no build dependencies # changes because some of our tests - those that need CI image might need to be run depending on # changed rules for static checks that are part of the pyproject.toml file return ( self.run_tests or self.docs_build or self.run_kubernetes_tests or self.needs_helm_tests or self.pyproject_toml_changed or self.any_provider_yaml_or_pyproject_toml_changed ) @cached_property def prod_image_build(self) -> bool: return self.run_kubernetes_tests or self.needs_helm_tests def _select_test_type_if_matching( self, test_types: set[str], test_type: SelectiveCoreTestType ) -> list[str]: matched_files = self._matching_files(test_type, TEST_TYPE_MATCHES) count = len(matched_files) if count > 0: test_types.add(test_type.value) get_console().print(f"[warning]{test_type} added because it matched {count} files[/]") return matched_files def _are_all_providers_affected(self) -> bool: # if "Providers" test is present in the list of tests, it means that we should run all providers tests # prepare all providers packages and build all providers documentation return "Providers" in self._get_providers_test_types_to_run() def _fail_if_suspended_providers_affected(self) -> bool: return "allow suspended provider changes" not in self._pr_labels def _get_core_test_types_to_run(self) -> list[str]: if self.full_tests_needed: return list(all_selective_core_test_types()) candidate_test_types: set[str] = {"Always"} matched_files: set[str] = set() for test_type in SelectiveCoreTestType: if test_type not in [ SelectiveCoreTestType.ALWAYS, SelectiveCoreTestType.CORE, SelectiveCoreTestType.OTHER, ]: matched_files.update(self._select_test_type_if_matching(candidate_test_types, test_type)) kubernetes_files = self._matching_files(FileGroupForCi.KUBERNETES_FILES, CI_FILE_GROUP_MATCHES) system_test_files = self._matching_files(FileGroupForCi.SYSTEM_TEST_FILES, CI_FILE_GROUP_MATCHES) all_source_files = self._matching_files(FileGroupForCi.ALL_SOURCE_FILES, CI_FILE_GROUP_MATCHES) all_providers_source_files = self._matching_files( FileGroupForCi.ALL_PROVIDERS_PYTHON_FILES, CI_FILE_GROUP_MATCHES ) test_always_files = self._matching_files(FileGroupForCi.ALWAYS_TESTS_FILES, CI_FILE_GROUP_MATCHES) test_ui_files = self._matching_files(FileGroupForCi.UI_FILES, CI_FILE_GROUP_MATCHES) remaining_files = ( set(all_source_files) - set(all_providers_source_files) - set(matched_files) - set(kubernetes_files) - set(system_test_files) - set(test_always_files) - set(test_ui_files) ) get_console().print(f"[warning]Remaining non test/always files: {len(remaining_files)}[/]") count_remaining_files = len(remaining_files) for file in self._files: if file.endswith("bash.py") and Path(file).parent.name == "operators": candidate_test_types.add("Serialization") candidate_test_types.add("Core") break if count_remaining_files > 0: get_console().print( f"[warning]We should run all core tests except providers." f"There are {count_remaining_files} changed files that seems to fall " f"into Core/Other category[/]" ) get_console().print(remaining_files) candidate_test_types.update(all_selective_core_test_types()) else: get_console().print( "[warning]There are no core/other files. Only tests relevant to the changed files are run.[/]" ) # sort according to predefined order sorted_candidate_test_types = sorted(candidate_test_types) get_console().print("[warning]Selected core test type candidates to run:[/]") get_console().print(sorted_candidate_test_types) return sorted_candidate_test_types def _get_providers_test_types_to_run(self, split_to_individual_providers: bool = False) -> list[str]: if self._default_branch != "main": return [] if self.full_tests_needed or self.run_task_sdk_tests: if split_to_individual_providers: return list(providers_test_type()) return ["Providers"] all_providers_source_files = self._matching_files( FileGroupForCi.ALL_PROVIDERS_PYTHON_FILES, CI_FILE_GROUP_MATCHES ) assets_source_files = self._matching_files(FileGroupForCi.ASSET_FILES, CI_FILE_GROUP_MATCHES) if ( len(all_providers_source_files) == 0 and len(assets_source_files) == 0 and not self.needs_api_tests ): # IF API tests are needed, that will trigger extra provider checks return [] affected_providers = self._find_all_providers_affected( include_docs=False, ) candidate_test_types: set[str] = set() if isinstance(affected_providers, AllProvidersSentinel): if split_to_individual_providers: for provider in get_available_distributions(): candidate_test_types.add(f"Providers[{provider}]") else: candidate_test_types.add("Providers") elif affected_providers: if split_to_individual_providers: for provider in affected_providers: candidate_test_types.add(f"Providers[{provider}]") else: candidate_test_types.add(f"Providers[{','.join(sorted(affected_providers))}]") sorted_candidate_test_types = sorted(candidate_test_types) get_console().print("[warning]Selected providers test type candidates to run:[/]") get_console().print(sorted_candidate_test_types) return sorted_candidate_test_types @staticmethod def _extract_long_provider_tests(current_test_types: set[str]): """ In case there are Provider tests in the list of test to run - either in the form of Providers or Providers[...] we subtract them from the test type, and add them to the list of tests to run individually. In case of Providers, we need to replace it with Providers[-<list_of_long_tests>], but in case of Providers[list_of_tests] we need to remove the long tests from the list. """ long_tests = ["amazon", "google", "standard"] for original_test_type in tuple(current_test_types): if original_test_type == "Providers": current_test_types.remove(original_test_type) for long_test in long_tests: current_test_types.add(f"Providers[{long_test}]") current_test_types.add(f"Providers[-{','.join(long_tests)}]") elif original_test_type.startswith("Providers["): provider_tests_to_run = ( original_test_type.replace("Providers[", "").replace("]", "").split(",") ) if any(long_test in provider_tests_to_run for long_test in long_tests): current_test_types.remove(original_test_type) for long_test in long_tests: if long_test in provider_tests_to_run: current_test_types.add(f"Providers[{long_test}]") provider_tests_to_run.remove(long_test) current_test_types.add(f"Providers[{','.join(provider_tests_to_run)}]") @cached_property def core_test_types_list_as_strings_in_json(self) -> str | None: if not self.run_tests: return None current_test_types = sorted(set(self._get_core_test_types_to_run())) return json.dumps(_get_test_list_as_json([current_test_types])) @cached_property def providers_test_types_list_as_strings_in_json(self) -> str: if not self.run_tests: return "[]" current_test_types = set(self._get_providers_test_types_to_run()) if self._default_branch != "main": test_types_to_remove: set[str] = set() for test_type in current_test_types: if test_type.startswith("Providers"): get_console().print( f"[warning]Removing {test_type} because the target branch " f"is {self._default_branch} and not main[/]" ) test_types_to_remove.add(test_type) current_test_types = current_test_types - test_types_to_remove self._extract_long_provider_tests(current_test_types) return json.dumps(_get_test_list_as_json([sorted(current_test_types)])) def _get_individual_providers_list(self): current_test_types = set(self._get_providers_test_types_to_run(split_to_individual_providers=True)) if "Providers" in current_test_types: current_test_types.remove("Providers") current_test_types.update( {f"Providers[{provider}]" for provider in get_available_distributions(include_not_ready=True)} ) return current_test_types @cached_property def individual_providers_test_types_list_as_strings_in_json(self) -> str | None: """Splits the list of test types into several lists of strings (to run them in parallel).""" if not self.run_tests: return None current_test_types = sorted(self._get_individual_providers_list()) if not current_test_types: return None # We are hard-coding the number of lists as reasonable starting point to split the # list of test types - and we can modify it in the future # TODO: In Python 3.12 we will be able to use itertools.batched if len(current_test_types) < NUMBER_OF_LOW_DEP_SLICES: return json.dumps(_get_test_list_as_json([current_test_types])) list_of_list_of_types = _split_list(current_test_types, NUMBER_OF_LOW_DEP_SLICES) return json.dumps(_get_test_list_as_json(list_of_list_of_types)) @cached_property def include_success_outputs( self, ) -> bool: return INCLUDE_SUCCESS_OUTPUTS_LABEL in self._pr_labels @cached_property def basic_checks_only(self) -> bool: return not self.ci_image_build @staticmethod def _print_diff(old_lines: list[str], new_lines: list[str]): diff = "\n".join(line for line in difflib.ndiff(old_lines, new_lines) if line and line[0] in "+-?") get_console().print(diff) @cached_property def generated_dependencies_changed(self) -> bool: return "generated/provider_dependencies.json" in self._files @cached_property def hatch_build_changed(self) -> bool: return "hatch_build.py" in self._files @cached_property def any_provider_yaml_or_pyproject_toml_changed(self) -> bool: if not self._commit_ref: get_console().print("[warning]Cannot determine changes as commit is missing[/]") return False for file in self._files: path_file = Path(file) if path_file.name == "provider.yaml" or path_file.name == "pyproject.toml": return True return False @cached_property def pyproject_toml_changed(self) -> bool: if not self._commit_ref: get_console().print("[warning]Cannot determine pyproject.toml changes as commit is missing[/]") return False if "pyproject.toml" not in self._files: return False new_result = run_command( ["git", "show", f"{self._commit_ref}:pyproject.toml"], capture_output=True, text=True, cwd=AIRFLOW_ROOT_PATH, check=False, ) if new_result.returncode != 0: get_console().print( f"[warning]Cannot determine pyproject.toml changes. " f"Could not get pyproject.toml from {self._commit_ref}[/]" ) return False old_result = run_command( ["git", "show", f"{self._commit_ref}^:pyproject.toml"], capture_output=True, text=True, cwd=AIRFLOW_ROOT_PATH, check=False, ) if old_result.returncode != 0: get_console().print( f"[warning]Cannot determine pyproject.toml changes. " f"Could not get pyproject.toml from {self._commit_ref}^[/]" ) return False try: import tomllib except ImportError: import tomli as tomllib self._new_toml = tomllib.loads(new_result.stdout) self._old_toml = tomllib.loads(old_result.stdout) return True @cached_property def build_system_changed_in_pyproject_toml(self) -> bool: if not self.pyproject_toml_changed: return False new_build_backend = self._new_toml["build-system"]["build-backend"] old_build_backend = self._old_toml["build-system"]["build-backend"] if new_build_backend != old_build_backend: get_console().print("[warning]Build backend changed in pyproject.toml [/]") self._print_diff([old_build_backend], [new_build_backend]) return True new_requires = self._new_toml["build-system"]["requires"] old_requires = self._old_toml["build-system"]["requires"] if new_requires != old_requires: get_console().print("[warning]Build system changed in pyproject.toml [/]") self._print_diff(old_requires, new_requires) return True return False @cached_property def upgrade_to_newer_dependencies(self) -> bool: if len(self._matching_files(FileGroupForCi.DEPENDENCY_FILES, CI_FILE_GROUP_MATCHES)) > 0: get_console().print("[warning]Upgrade to newer dependencies: Dependency files changed[/]") return True if self.hatch_build_changed: get_console().print("[warning]Upgrade to newer dependencies: hatch_build.py changed[/]") return True if self.build_system_changed_in_pyproject_toml: get_console().print( "[warning]Upgrade to newer dependencies: Build system changed in pyproject.toml[/]" ) return True if self._github_event in [GithubEvents.PUSH, GithubEvents.SCHEDULE]: get_console().print("[warning]Upgrade to newer dependencies: Push or Schedule event[/]") return True if UPGRADE_TO_NEWER_DEPENDENCIES_LABEL in self._pr_labels: get_console().print( f"[warning]Upgrade to newer dependencies: Label '{UPGRADE_TO_NEWER_DEPENDENCIES_LABEL}' " f"in {self._pr_labels}[/]" ) return True return False @cached_property def docs_list_as_string(self) -> str | None: _ALL_DOCS_LIST = "" if not self.docs_build: return None if self._default_branch != "main": return "apache-airflow docker-stack" if self.full_tests_needed: return _ALL_DOCS_LIST providers_affected = self._find_all_providers_affected( include_docs=True, ) if ( isinstance(providers_affected, AllProvidersSentinel) or "docs/conf.py" in self._files or "docs/build_docs.py" in self._files or self._are_all_providers_affected() ): return _ALL_DOCS_LIST packages = [] if any(file.startswith(("airflow-core/src/airflow/", "airflow-core/docs/")) for file in self._files): packages.append("apache-airflow") if any(file.startswith("providers-summary-docs/") for file in self._files): packages.append("apache-airflow-providers") if any(file.startswith("chart/") for file in self._files): packages.append("helm-chart") if any(file.startswith("docker-stack-docs") for file in self._files): packages.append("docker-stack") if providers_affected: for provider in providers_affected: packages.append(provider.replace("-", ".")) return " ".join(packages) @cached_property def skip_pre_commits(self) -> str: pre_commits_to_skip = set() pre_commits_to_skip.add("identity") # Skip all mypy "individual" file checks if we are running mypy checks in CI # In the CI we always run mypy for the whole "package" rather than for `--all-files` because # The pre-commit will semi-randomly skip such list of files into several groups and we want # to make sure that such checks are always run in CI for whole "group" of files - i.e. # whole package rather than for individual files. That's why we skip those checks in CI # and run them via `mypy-all` command instead and dedicated CI job in matrix # This will also speed up static-checks job usually as the jobs will be running in parallel pre_commits_to_skip.update( { "mypy-providers", "mypy-airflow-core", "mypy-dev", "mypy-task-sdk", "mypy-airflow-ctl", "mypy-devel-common", } ) if self._default_branch != "main": # Skip those tests on all "release" branches pre_commits_to_skip.update( ( "check-airflow-provider-compatibility", "check-extra-packages-references", "check-provider-yaml-valid", "lint-helm-chart", "validate-operators-init", ) ) if self.full_tests_needed: # when full tests are needed, we do not want to skip any checks and we should # run all the pre-commits just to be sure everything is ok when some structural changes occurred return ",".join(sorted(pre_commits_to_skip)) if not ( self._matching_files(FileGroupForCi.UI_FILES, CI_FILE_GROUP_MATCHES) or self._matching_files(FileGroupForCi.API_CODEGEN_FILES, CI_FILE_GROUP_MATCHES) ): pre_commits_to_skip.add("ts-compile-format-lint-ui") if not self._matching_files(FileGroupForCi.ALL_PYTHON_FILES, CI_FILE_GROUP_MATCHES): pre_commits_to_skip.add("flynt") if not self._matching_files( FileGroupForCi.HELM_FILES, CI_FILE_GROUP_MATCHES, ): pre_commits_to_skip.add("lint-helm-chart") if not ( self._matching_files(FileGroupForCi.ALL_PROVIDER_YAML_FILES, CI_FILE_GROUP_MATCHES) or self._matching_files(FileGroupForCi.ALL_PROVIDERS_PYTHON_FILES, CI_FILE_GROUP_MATCHES) ): # only skip provider validation if none of the provider.yaml and provider # python files changed because validation also walks through all the provider python files pre_commits_to_skip.add("check-provider-yaml-valid") return ",".join(sorted(pre_commits_to_skip)) @cached_property def skip_providers_tests(self) -> bool: if self._default_branch != "main": return True if self.full_tests_needed: return False if self._get_providers_test_types_to_run(): return False if not self.run_tests: return True return True @cached_property def docker_cache(self) -> str: return "disabled" if DISABLE_IMAGE_CACHE_LABEL in self._pr_labels else "registry" @cached_property def debug_resources(self) -> bool: return DEBUG_CI_RESOURCES_LABEL in self._pr_labels @cached_property def disable_airflow_repo_cache(self) -> bool: return self.docker_cache == "disabled" @cached_property def helm_test_packages(self) -> str: return json.dumps(all_helm_test_packages()) @cached_property def selected_providers_list_as_string(self) -> str | None: if self._default_branch != "main": return None if self.full_tests_needed: return "" if self._are_all_providers_affected(): return "" affected_providers = self._find_all_providers_affected(include_docs=True) if not affected_providers: return None if isinstance(affected_providers, AllProvidersSentinel): return "" return " ".join(sorted(affected_providers)) @cached_property def amd_runners(self) -> str: return PUBLIC_AMD_RUNNERS @cached_property def arm_runners(self) -> str: return PUBLIC_ARM_RUNNERS @cached_property def has_migrations(self) -> bool: return any([file.startswith("airflow-core/src/airflow/migrations/") for file in self._files]) @cached_property def providers_compatibility_tests_matrix(self) -> str: """Provider compatibility input matrix for the current run. Filter out python versions not built""" return json.dumps( [ check for check in PROVIDERS_COMPATIBILITY_TESTS_MATRIX if check["python-version"] in self.python_versions ] ) @cached_property def excluded_providers_as_string(self) -> str: providers_to_exclude = defaultdict(list) for provider, provider_info in DEPENDENCIES.items(): if "excluded-python-versions" in provider_info: for python_version in provider_info["excluded-python-versions"]: providers_to_exclude[python_version].append(provider) sorted_providers_to_exclude = dict( sorted(providers_to_exclude.items(), key=lambda item: int(item[0].split(".")[1])) ) # ^ sort by Python minor version return json.dumps(sorted_providers_to_exclude) @cached_property def only_new_ui_files(self) -> bool: all_source_files = set(self._matching_files(FileGroupForCi.ALL_SOURCE_FILES, CI_FILE_GROUP_MATCHES)) new_ui_source_files = set(self._matching_files(FileGroupForCi.UI_FILES, CI_FILE_GROUP_MATCHES)) remaining_files = all_source_files - new_ui_source_files if all_source_files and new_ui_source_files and not remaining_files: return True return False @cached_property def testable_core_integrations(self) -> list[str]: if not self.run_tests: return [] return [ integration for integration in TESTABLE_CORE_INTEGRATIONS if integration not in DISABLE_TESTABLE_INTEGRATIONS_FROM_CI ] @cached_property def testable_providers_integrations(self) -> list[str]: if not self.run_tests: return [] return [ integration for integration in TESTABLE_PROVIDERS_INTEGRATIONS if integration not in DISABLE_TESTABLE_INTEGRATIONS_FROM_CI ] @cached_property def is_committer_build(self): if NON_COMMITTER_BUILD_LABEL in self._pr_labels: return False return self._github_actor in COMMITTERS def _find_all_providers_affected(self, include_docs: bool) -> list[str] | AllProvidersSentinel | None: affected_providers: set[str] = set() all_providers_affected = False suspended_providers: set[str] = set() for changed_file in self._files: provider = find_provider_affected(changed_file, include_docs=include_docs) if provider == "Providers": all_providers_affected = True elif provider is not None: if provider not in DEPENDENCIES: suspended_providers.add(provider) else: affected_providers.add(provider) if self.needs_api_tests: affected_providers.add("fab") if self.needs_ol_tests: affected_providers.add("openlineage") if all_providers_affected: return ALL_PROVIDERS_SENTINEL if suspended_providers: # We check for suspended providers only after we have checked if all providers are affected. # No matter if we found that we are modifying a suspended provider individually, # if all providers are # affected, then it means that we are ok to proceed because likely we are running some kind of # global refactoring that affects multiple providers including the suspended one. This is a # potential escape hatch if someone would like to modify suspended provider, # but it can be found at the review time and is anyway harmless as the provider will not be # released nor tested nor used in CI anyway. get_console().print("[yellow]You are modifying suspended providers.\n") get_console().print( "[info]Some providers modified by this change have been suspended, " "and before attempting such changes you should fix the reason for suspension." ) get_console().print( "[info]When fixing it, you should set suspended = false in provider.yaml " "to make changes to the provider." ) get_console().print(f"Suspended providers: {suspended_providers}") if self._fail_if_suspended_providers_affected(): get_console().print( "[error]This PR did not have `allow suspended provider changes`" " label set so it will fail." ) sys.exit(1) else: get_console().print( "[info]This PR had `allow suspended provider changes` label set so it will continue" ) if not affected_providers: return None for provider in list(affected_providers): affected_providers.update( get_related_providers(provider, upstream_dependencies=True, downstream_dependencies=True) ) return sorted(affected_providers) def _is_canary_run(self): return ( self._github_event in [GithubEvents.SCHEDULE, GithubEvents.PUSH, GithubEvents.WORKFLOW_DISPATCH] and self._github_repository == APACHE_AIRFLOW_GITHUB_REPOSITORY ) or CANARY_LABEL in self._pr_labels @classmethod def _find_caplog_in_def(cls, added_lines): """ Find caplog in def :param added_lines: lines added in the file :return: True if caplog is found in def else False """ line_counter = 0 while line_counter < len(added_lines): if all(keyword in added_lines[line_counter] for keyword in ["def", "caplog", "(", ")"]): return True if "def" in added_lines[line_counter] and ")" not in added_lines[line_counter]: while ")" not in added_lines[line_counter]: if "caplog" in added_lines[line_counter]: return True line_counter += 1 line_counter += 1 def _caplog_exists_in_added_lines(self) -> bool: """ Check if caplog is used in added lines :return: True if caplog is used in added lines else False """ lines = run_command( ["git", "diff", f"{self._commit_ref}"], capture_output=True, text=True, cwd=AIRFLOW_ROOT_PATH, check=False, ) if "caplog" not in lines.stdout or lines.stdout == "": return False added_caplog_lines = [ line.lstrip().lstrip("+ ") for line in lines.stdout.split("\n") if line.lstrip().startswith("+ ") ] return self._find_caplog_in_def(added_lines=added_caplog_lines) @cached_property def is_log_mocked_in_the_tests(self) -> bool: """ Check if log is used without mock in the tests """ if self._is_canary_run() or self._github_event not in ( GithubEvents.PULL_REQUEST, GithubEvents.PULL_REQUEST_TARGET, ): return False # Check if changed files are unit tests if ( self._matching_files(FileGroupForCi.UNIT_TEST_FILES, CI_FILE_GROUP_MATCHES) and LOG_WITHOUT_MOCK_IN_TESTS_EXCEPTION_LABEL not in self._pr_labels ): if self._caplog_exists_in_added_lines(): get_console().print( f"[error]Caplog is used in the test. " f"Please be sure you are mocking the log. " "For example, use `patch.object` to mock `log`." "Using `caplog` directly in your test files can cause side effects " "and not recommended. If you think, `caplog` is the only way to test " "the functionality or there is and exceptional case, " "please ask maintainer to include as an exception using " f"'{LOG_WITHOUT_MOCK_IN_TESTS_EXCEPTION_LABEL}' label. " "It is up to maintainer decision to allow this exception.", ) sys.exit(1) else: return True return True @cached_property def force_pip(self): return FORCE_PIP_LABEL in self._pr_labels