images/airflow/2.10.1/python/mwaa/entrypoint.py [195:280]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def _read_requirements_file(requirements_file: str) -> str:
    with open(requirements_file, "rb") as f:
        return auto_decode(f.read())


def _requirements_has_constraints(requirements_file: str):
    content = _read_requirements_file(requirements_file)
    for line in content.splitlines():
        # Notice that this regex check will also match lines with commented out
        # constraints flag. This is intentional as a mechanism for users who want to
        # avoid enforcing the default Airflow constraints, yet don't want to provide a
        # constraints file.
        if re.search(r"-c |--constraint ", line):
            return True
    return False


async def install_user_requirements(cmd: str, environ: dict[str, str]):
    """
    Install user requirements.

    User requirements should be placed in a requirements.txt file and the environment
    variable `MWAA__CORE__REQUIREMENTS_PATH` should be set to the location of that file.
    In a Docker Compose setup, you would usually want to create a volume that maps a
    requirements.txt file in the host machine somewhere in the container, and then set
    the `MWAA__CORE__REQUIREMENTS_PATH` accordingly.

    :param environ: A dictionary containing the environment variables.
    """
    requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH")
    logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}")
    # For hybrid worker/scheduler containers we publish the requirement install logs
    # to the worker CloudWatch log group.
    logger_prefix = "worker" if cmd == "hybrid" else cmd;
    if requirements_file and os.path.isfile(requirements_file):
        logger.info(f"Installing user requirements from {requirements_file}...")

        subprocess_logger = CompositeLogger(
            "requirements_composite_logging",  # name can be anything unused.
            # We use a set to avoid double logging to console if the user doesn't
            # use CloudWatch for logging.
            *set(
                [
                    logging.getLogger(MWAA_LOGGERS.get(f"{logger_prefix}_requirements")),
                    logger,
                ]
            ),
        )

        extra_args = []
        try:
            if not _requirements_has_constraints(requirements_file):
                subprocess_logger.warning(
                    "WARNING: Constraints should be specified for requirements.txt. "
                    f"Please see {MWAA_DOCS_REQUIREMENTS_GUIDE}"
                )
                subprocess_logger.warning("Forcing local constraints")
                extra_args = ["-c", os.environ["AIRFLOW_CONSTRAINTS_FILE"]]
        except Exception as e:
            subprocess_logger.warning(f"Unable to scan requirements file: {e}")
            subprocess_logger.warning(
                "Cannot determine whether the requirements.txt file has constraints "
                "or not; forcing local constraints."
            )
            extra_args = ["-c", os.environ["AIRFLOW_CONSTRAINTS_FILE"]]

        pip_process = Subprocess(
            cmd=["safe-pip-install", "-r", requirements_file, *extra_args],
            env=environ,
            process_logger=subprocess_logger,
            conditions=[
                TimeoutCondition(USER_REQUIREMENTS_MAX_INSTALL_TIME),
            ],
            friendly_name=f"{logger_prefix}_requirements",
        )
        pip_process.start()
        if pip_process.process and pip_process.process.returncode != 0:
            subprocess_logger.error(
                "ERROR: pip installation exited with a non-zero error code. This could "
                "be the result of package conflict. Notice that MWAA enforces a list "
                "of critical packages, e.g. Airflow, Celery, among others, whose "
                "version cannot be overridden by the customer as that can break our "
                "setup. Please double check your requirements.txt file."
            )
    else:
        logger.info("No user requirements to install.")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



