images/airflow/2.10.1/python/mwaa/entrypoint.py [17:225]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    DAG_PROCESSOR_LOGGER_NAME,
    LOGGING_CONFIG,
    SCHEDULER_LOGGER_NAME,
    TRIGGERER_LOGGER_NAME,
    WEBSERVER_LOGGER_NAME,
    WORKER_LOGGER_NAME,
)
logging.config.dictConfig(LOGGING_CONFIG)
# fmt: on

# Python imports
from datetime import timedelta
from functools import cache
from typing import Callable, Dict, List, Optional
import asyncio
import json
import logging
import os
import re
import shlex
import sys
import time

# 3rd party imports
import boto3
from botocore.exceptions import ClientError

# Our imports
from mwaa.celery.task_monitor import WorkerTaskMonitor
from mwaa.config.airflow import (
    get_essential_airflow_config,
    get_opinionated_airflow_config,
    get_user_airflow_config,
)
from mwaa.config.environ import get_essential_environ, get_opinionated_environ
from mwaa.config.sqs import (
    get_sqs_queue_name,
    should_create_queue,
)
from mwaa.logging.config import MWAA_LOGGERS
from mwaa.logging.loggers import CompositeLogger
from mwaa.subprocess.conditions import (
    SIDECAR_DEFAULT_HEALTH_PORT,
    AirflowDbReachableCondition,
    TaskMonitoringCondition,
    ProcessCondition,
    SidecarHealthCondition,
    TimeoutCondition,
)
from mwaa.subprocess.subprocess import Subprocess, run_subprocesses
from mwaa.utils.cmd import run_command
from mwaa.utils.dblock import with_db_lock
from mwaa.utils.encoding import auto_decode


# Usually, we pass the `__name__` variable instead as that defaults to the
# module path, i.e. `mwaa.entrypoint` in this case. However, since this is
# the entrypoint script, `__name__` will have the value of `__main__`, hence
# we hard-code the module path.
logger = logging.getLogger("mwaa.entrypoint")

# TODO Fix the "type: ignore"s in this file.

AVAILABLE_COMMANDS = [
    "webserver",
    "scheduler",
    "worker",
    "hybrid",
    "shell",
    "resetdb",
    "spy",
    "test-requirements",
    "test-startup-script",
]
MWAA_DOCS_REQUIREMENTS_GUIDE = "https://docs.aws.amazon.com/mwaa/latest/userguide/working-dags-dependencies.html#working-dags-dependencies-test-create"
STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL = timedelta(seconds=5)
STARTUP_SCRIPT_MAX_EXECUTION_TIME = (
    timedelta(minutes=5) - STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL
)
USER_REQUIREMENTS_MAX_INSTALL_TIME = timedelta(minutes=9)

# Save the start time of the container. This is used later to with the sidecar
# monitoring because we need to have a grace period before we start reporting timeouts
# related to sidecar endpoint not reporting health messages.
CONTAINER_START_TIME = time.time()

# Hybrid container runs both scheduler and worker as essential subprocesses.
# Therefore the default worker patience is increased to mitigate task
# failures due to scheduler failure.
HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT = timedelta(seconds=130)


async def airflow_db_init(environ: dict[str, str]):
    """
    Initialize Airflow database.

    Before Airflow can be used, a call to `airflow db migrate` must be done. This
    function does this. This function is called in the entrypoint to make sure that,
    for any Airflow component, the database is initialized before it starts.

    :param environ: A dictionary containing the environment variables.
    """
    await run_command("python3 -m mwaa.database.migrate", env=environ)


@with_db_lock(4321)
async def airflow_db_reset(environ: dict[str, str]):
    """
    Reset Airflow metadata database.

    This function resets the Airflow metadata database. It is called when the `resetdb`
    command is specified.

    :param environ: A dictionary containing the environment variables.
    """
    logger.info("Resetting Airflow metadata database.")
    await run_command("airflow db reset --yes", env=environ)


@with_db_lock(5678)
async def create_airflow_user(environ: dict[str, str]):
    """
    Create the 'airflow' user.

    To be able to login to the webserver, you need a user. This function creates a user
    with default credentials.

    Notice that this should only be used in development context. In production, other
    means need to be employed to create users with strong passwords. Alternatively, with
    MWAA setup, a plugin is employed to integrate with IAM (not implemented yet.)

    :param environ: A dictionary containing the environment variables.
    """
    logger.info("Calling 'airflow users create' to create the webserver user.")
    await run_command(
        "airflow users create "
        "--username airflow "
        "--firstname Airflow "
        "--lastname Admin "
        "--email airflow@example.com "
        "--role Admin "
        "--password airflow",
        env=environ,
    )


@with_db_lock(1357)
def create_queue() -> None:
    """
    Create the SQS required by Celery.

    In our setup, we use SQS as the backend for Celery. Usually, this should be created
    before hand. However, sometimes you might want to create the SQS queue during
    startup. One such example is when using the elasticmq server as a mock SQS server.
    """
    if not should_create_queue():
        return
    queue_name = get_sqs_queue_name()
    endpoint = os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT")
    sqs = boto3.client("sqs", endpoint_url=endpoint)  # type: ignore
    try:
        # Try to get the queue URL to check if it exists
        sqs.get_queue_url(QueueName=queue_name)["QueueUrl"]  # type: ignore
        logger.info(f"Queue {queue_name} already exists.")
    except ClientError as e:
        # If the queue does not exist, create it
        if (
            e.response.get("Error", {}).get("Message")  # type: ignore
            == "The specified queue does not exist."
        ):
            response = sqs.create_queue(QueueName=queue_name)  # type: ignore
            queue_url = response["QueueUrl"]  # type: ignore
            logger.info(f"Queue created: {queue_url}")
        else:
            # If there is a different error, raise it
            raise e


def _read_requirements_file(requirements_file: str) -> str:
    with open(requirements_file, "rb") as f:
        return auto_decode(f.read())


def _requirements_has_constraints(requirements_file: str):
    content = _read_requirements_file(requirements_file)
    for line in content.splitlines():
        # Notice that this regex check will also match lines with commented out
        # constraints flag. This is intentional as a mechanism for users who want to
        # avoid enforcing the default Airflow constraints, yet don't want to provide a
        # constraints file.
        if re.search(r"-c |--constraint ", line):
            return True
    return False


async def install_user_requirements(cmd: str, environ: dict[str, str]):
    """
    Install user requirements.

    User requirements should be placed in a requirements.txt file and the environment
    variable `MWAA__CORE__REQUIREMENTS_PATH` should be set to the location of that file.
    In a Docker Compose setup, you would usually want to create a volume that maps a
    requirements.txt file in the host machine somewhere in the container, and then set
    the `MWAA__CORE__REQUIREMENTS_PATH` accordingly.

    :param environ: A dictionary containing the environment variables.
    """
    requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH")
    logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



images/airflow/2.10.3/python/mwaa/entrypoint.py [17:225]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    DAG_PROCESSOR_LOGGER_NAME,
    LOGGING_CONFIG,
    SCHEDULER_LOGGER_NAME,
    TRIGGERER_LOGGER_NAME,
    WEBSERVER_LOGGER_NAME,
    WORKER_LOGGER_NAME,
)
logging.config.dictConfig(LOGGING_CONFIG)
# fmt: on

# Python imports
from datetime import timedelta
from functools import cache
from typing import Callable, Dict, List, Optional
import asyncio
import json
import logging
import os
import re
import shlex
import sys
import time

# 3rd party imports
import boto3
from botocore.exceptions import ClientError

# Our imports
from mwaa.celery.task_monitor import WorkerTaskMonitor
from mwaa.config.airflow import (
    get_essential_airflow_config,
    get_opinionated_airflow_config,
    get_user_airflow_config,
)
from mwaa.config.environ import get_essential_environ, get_opinionated_environ
from mwaa.config.sqs import (
    get_sqs_queue_name,
    should_create_queue,
)
from mwaa.logging.config import MWAA_LOGGERS
from mwaa.logging.loggers import CompositeLogger
from mwaa.subprocess.conditions import (
    SIDECAR_DEFAULT_HEALTH_PORT,
    AirflowDbReachableCondition,
    TaskMonitoringCondition,
    ProcessCondition,
    SidecarHealthCondition,
    TimeoutCondition,
)
from mwaa.subprocess.subprocess import Subprocess, run_subprocesses
from mwaa.utils.cmd import run_command
from mwaa.utils.dblock import with_db_lock
from mwaa.utils.encoding import auto_decode


# Usually, we pass the `__name__` variable instead as that defaults to the
# module path, i.e. `mwaa.entrypoint` in this case. However, since this is
# the entrypoint script, `__name__` will have the value of `__main__`, hence
# we hard-code the module path.
logger = logging.getLogger("mwaa.entrypoint")

# TODO Fix the "type: ignore"s in this file.

AVAILABLE_COMMANDS = [
    "webserver",
    "scheduler",
    "worker",
    "hybrid",
    "shell",
    "resetdb",
    "spy",
    "test-requirements",
    "test-startup-script",
]
MWAA_DOCS_REQUIREMENTS_GUIDE = "https://docs.aws.amazon.com/mwaa/latest/userguide/working-dags-dependencies.html#working-dags-dependencies-test-create"
STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL = timedelta(seconds=5)
STARTUP_SCRIPT_MAX_EXECUTION_TIME = (
    timedelta(minutes=5) - STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL
)
USER_REQUIREMENTS_MAX_INSTALL_TIME = timedelta(minutes=9)

# Save the start time of the container. This is used later to with the sidecar
# monitoring because we need to have a grace period before we start reporting timeouts
# related to sidecar endpoint not reporting health messages.
CONTAINER_START_TIME = time.time()

# Hybrid container runs both scheduler and worker as essential subprocesses.
# Therefore the default worker patience is increased to mitigate task
# failures due to scheduler failure.
HYBRID_WORKER_SIGTERM_PATIENCE_INTERVAL_DEFAULT = timedelta(seconds=130)


async def airflow_db_init(environ: dict[str, str]):
    """
    Initialize Airflow database.

    Before Airflow can be used, a call to `airflow db migrate` must be done. This
    function does this. This function is called in the entrypoint to make sure that,
    for any Airflow component, the database is initialized before it starts.

    :param environ: A dictionary containing the environment variables.
    """
    await run_command("python3 -m mwaa.database.migrate", env=environ)


@with_db_lock(4321)
async def airflow_db_reset(environ: dict[str, str]):
    """
    Reset Airflow metadata database.

    This function resets the Airflow metadata database. It is called when the `resetdb`
    command is specified.

    :param environ: A dictionary containing the environment variables.
    """
    logger.info("Resetting Airflow metadata database.")
    await run_command("airflow db reset --yes", env=environ)


@with_db_lock(5678)
async def create_airflow_user(environ: dict[str, str]):
    """
    Create the 'airflow' user.

    To be able to login to the webserver, you need a user. This function creates a user
    with default credentials.

    Notice that this should only be used in development context. In production, other
    means need to be employed to create users with strong passwords. Alternatively, with
    MWAA setup, a plugin is employed to integrate with IAM (not implemented yet.)

    :param environ: A dictionary containing the environment variables.
    """
    logger.info("Calling 'airflow users create' to create the webserver user.")
    await run_command(
        "airflow users create "
        "--username airflow "
        "--firstname Airflow "
        "--lastname Admin "
        "--email airflow@example.com "
        "--role Admin "
        "--password airflow",
        env=environ,
    )


@with_db_lock(1357)
def create_queue() -> None:
    """
    Create the SQS required by Celery.

    In our setup, we use SQS as the backend for Celery. Usually, this should be created
    before hand. However, sometimes you might want to create the SQS queue during
    startup. One such example is when using the elasticmq server as a mock SQS server.
    """
    if not should_create_queue():
        return
    queue_name = get_sqs_queue_name()
    endpoint = os.environ.get("MWAA__SQS__CUSTOM_ENDPOINT")
    sqs = boto3.client("sqs", endpoint_url=endpoint)  # type: ignore
    try:
        # Try to get the queue URL to check if it exists
        sqs.get_queue_url(QueueName=queue_name)["QueueUrl"]  # type: ignore
        logger.info(f"Queue {queue_name} already exists.")
    except ClientError as e:
        # If the queue does not exist, create it
        if (
            e.response.get("Error", {}).get("Message")  # type: ignore
            == "The specified queue does not exist."
        ):
            response = sqs.create_queue(QueueName=queue_name)  # type: ignore
            queue_url = response["QueueUrl"]  # type: ignore
            logger.info(f"Queue created: {queue_url}")
        else:
            # If there is a different error, raise it
            raise e


def _read_requirements_file(requirements_file: str) -> str:
    with open(requirements_file, "rb") as f:
        return auto_decode(f.read())


def _requirements_has_constraints(requirements_file: str):
    content = _read_requirements_file(requirements_file)
    for line in content.splitlines():
        # Notice that this regex check will also match lines with commented out
        # constraints flag. This is intentional as a mechanism for users who want to
        # avoid enforcing the default Airflow constraints, yet don't want to provide a
        # constraints file.
        if re.search(r"-c |--constraint ", line):
            return True
    return False


async def install_user_requirements(cmd: str, environ: dict[str, str]):
    """
    Install user requirements.

    User requirements should be placed in a requirements.txt file and the environment
    variable `MWAA__CORE__REQUIREMENTS_PATH` should be set to the location of that file.
    In a Docker Compose setup, you would usually want to create a volume that maps a
    requirements.txt file in the host machine somewhere in the container, and then set
    the `MWAA__CORE__REQUIREMENTS_PATH` accordingly.

    :param environ: A dictionary containing the environment variables.
    """
    requirements_file = environ.get("MWAA__CORE__REQUIREMENTS_PATH")
    logger.info(f"MWAA__CORE__REQUIREMENTS_PATH = {requirements_file}")
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



