def execute_startup_script()

in images/airflow/2.10.1/python/mwaa/entrypoint.py [0:0]


def execute_startup_script(cmd: str, environ: Dict[str, str]) -> Dict[str, str]:
    """
    Execute user startup script.

    :param cmd - The MWAA command the container is running, e.g. "worker", "scheduler". This is used for logging
    purposes so the logs of the execution of the startup script get sent to the correct place.
    :param environ: A dictionary containing the environment variables.
    """
    startup_script_path = os.environ.get("MWAA__CORE__STARTUP_SCRIPT_PATH", "")
    if not startup_script_path:
        logger.info("MWAA__CORE__STARTUP_SCRIPT_PATH is not provided.")
        return {}

    EXECUTE_USER_STARTUP_SCRIPT_PATH = "execute-user-startup-script"
    POST_STARTUP_SCRIPT_VERIFICATION_PATH = "post-startup-script-verification"
    # For hybrid worker/scheduler containers we publish the startup script logs
    # to the worker CloudWatch log group.
    PROCESS_LOGGER_PREFIX = "worker" if cmd == "hybrid" else cmd;
    PROCESS_LOGGER = logging.getLogger(MWAA_LOGGERS.get(f"{PROCESS_LOGGER_PREFIX}_startup"))

    if os.path.isfile(startup_script_path):
        logger.info("Executing customer startup script.")

        start_time = time.time()  # Capture start time
        startup_script_process = Subprocess(
            cmd=["/bin/bash", EXECUTE_USER_STARTUP_SCRIPT_PATH],
            env=environ,
            process_logger=PROCESS_LOGGER,
            conditions=[
                TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME),
            ],
            friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup",
            sigterm_patience_interval=STARTUP_SCRIPT_SIGTERM_PATIENCE_INTERVAL,
        )
        startup_script_process.start()
        end_time = time.time()
        duration = end_time - start_time
        PROCESS_LOGGER.info(f"Startup script execution time: {duration:.2f} seconds.")

        logger.info("Executing post startup script verification.")
        verification_process = Subprocess(
            cmd=["/bin/bash", POST_STARTUP_SCRIPT_VERIFICATION_PATH],
            env=environ,
            process_logger=PROCESS_LOGGER,
            conditions=[
                TimeoutCondition(STARTUP_SCRIPT_MAX_EXECUTION_TIME),
            ],
            friendly_name=f"{PROCESS_LOGGER_PREFIX}_startup",
        )
        verification_process.start()

        customer_env_vars_path = "/tmp/customer_env_vars.json"
        if os.path.isfile(customer_env_vars_path):
            try:
                with open(customer_env_vars_path, "r") as f:
                    customer_env_dict = json.load(f)
                logger.info("Successfully read the customer's environment variables.")
                return customer_env_dict
            except Exception as e:
                logger.error(f"Error reading the customer's environment variables: {e}")
                PROCESS_LOGGER.error(
                    "[ERROR] Failed to load environment variables from startup script. "
                    "Please verify your startup script configuration."
                )
                raise Exception(f"Failed to read customer's environment variables from startup script: {e}")

        else:
            logger.error(
                "An unexpected error occurred: the file containing the customer-defined "
                "environment variables could not be located. If the customer's startup "
                "script defines environment variables, this error message indicates that "
                "those variables won't be exported to the Airflow tasks."
            )
            PROCESS_LOGGER.error("[ERROR] An unexpected error occurred: Failed to locate environment variables file from startup script.")
            raise Exception("Failed to access customer environment variables file: Service was unable to create or locate /tmp/customer_env_vars.json")

    else:
        logger.info(f"No startup script found at {startup_script_path}.")
        return {}