images/airflow/2.9.2/python/mwaa/utils/user_requirements.py (72 lines of code) (raw):

""" User requirements handling module for Amazon MWAA. This module manages user-defined Python package requirements and their installation process in Amazon MWAA environments. It handles requirement file parsing, validation, and package installation procedures. """ # Python imports from datetime import timedelta import logging import os import re # Our imports from mwaa.logging.config import MWAA_LOGGERS from mwaa.logging.loggers import CompositeLogger from mwaa.subprocess.subprocess import Subprocess from mwaa.subprocess.conditions import TimeoutCondition from mwaa.utils.encoding import auto_decode MWAA_DOCS_REQUIREMENTS_GUIDE = "https://docs.aws.amazon.com/mwaa/latest/userguide/working-dags-dependencies.html#working-dags-dependencies-test-create" USER_REQUIREMENTS_MAX_INSTALL_TIME = timedelta(minutes=9) logger = logging.getLogger("mwaa.entrypoint") def _read_requirements_file(requirements_file: str) -> str: with open(requirements_file, "rb") as f: return auto_decode(f.read()) def _requirements_has_constraints(requirements_file: str): content = _read_requirements_file(requirements_file) for line in content.splitlines(): # Notice that this regex check will also match lines with commented out # constraints flag. This is intentional as a mechanism for users who want to # avoid enforcing the default Airflow constraints, yet don't want to provide a # constraints file. if re.search(r"-c |--constraint ", line): return True return False async def install_user_requirements(cmd: str, environ: dict[str, str]): """ Install user requirements. User requirements should be placed in a requirements.txt file and the environment variable `MWAA__CORE__REQUIREMENTS_PATH` should be set to the location of that file. In a Docker Compose setup, you would usually want to create a volume that maps a requirements.txt file in the host machine somewhere in the container, and then set the `MWAA__CORE__REQUIREMENTS_PATH` accordingly. :param environ: A dictionary containing the environment variables. """ requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH") logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}") # For hybrid worker/scheduler containers we publish the requirement install logs # to the worker CloudWatch log group. logger_prefix = "worker" if cmd == "hybrid" else cmd; if requirements_file and os.path.isfile(requirements_file): logger.info(f"Installing user requirements from {requirements_file}...") subprocess_logger = CompositeLogger( "requirements_composite_logging", # name can be anything unused. # We use a set to avoid double logging to console if the user doesn't # use CloudWatch for logging. *set( [ logging.getLogger(MWAA_LOGGERS.get(f"{logger_prefix}_requirements")), logger, ] ), ) extra_args = [] try: if not _requirements_has_constraints(requirements_file): subprocess_logger.warning( "WARNING: Constraints should be specified for requirements.txt. " f"Please see {MWAA_DOCS_REQUIREMENTS_GUIDE}" ) subprocess_logger.warning("Forcing local constraints") extra_args = ["-c", os.environ["AIRFLOW_CONSTRAINTS_FILE"]] except Exception as e: subprocess_logger.warning(f"Unable to scan requirements file: {e}") subprocess_logger.warning( "Cannot determine whether the requirements.txt file has constraints " "or not; forcing local constraints." ) extra_args = ["-c", os.environ["AIRFLOW_CONSTRAINTS_FILE"]] pip_process = Subprocess( cmd=["safe-pip-install", "-r", requirements_file, *extra_args], env=environ, process_logger=subprocess_logger, conditions=[ TimeoutCondition(USER_REQUIREMENTS_MAX_INSTALL_TIME), ], friendly_name=f"{logger_prefix}_requirements", ) pip_process.start() if pip_process.process and pip_process.process.returncode != 0: subprocess_logger.error( "ERROR: pip installation exited with a non-zero error code. This could " "be the result of package conflict. Notice that MWAA enforces a list " "of critical packages, e.g. Airflow, Celery, among others, whose " "version cannot be overridden by the customer as that can break our " "setup. Please double check your requirements.txt file." ) else: logger.info("No user requirements to install.")