def action_logging()

in airflow-core/src/airflow/api_fastapi/logging/decorators.py [0:0]


def action_logging(event: str | None = None):
    async def log_action(
        request: Request,
        session: SessionDep,
        user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
    ):
        """Log user actions."""
        event_name = event or request.scope["endpoint"].__name__

        if not user:
            user_name = "anonymous"
            user_display = ""
        else:
            user_name = user.get_name()
            user_display = user.get_name()

        has_json_body = "application/json" in request.headers.get("content-type", "") and await request.body()

        if has_json_body:
            request_body = await request.json()
            masked_body_json = {k: secrets_masker.redact(v, k) for k, v in request_body.items()}
        else:
            request_body = {}
            masked_body_json = {}

        fields_skip_logging = {
            "csrf_token",
            "_csrf_token",
            "is_paused",
            "dag_id",
            "task_id",
            "dag_run_id",
            "run_id",
            "logical_date",
        }

        extra_fields = {
            k: secrets_masker.redact(v, k)
            for k, v in itertools.chain(request.query_params.items(), request.path_params.items())
            if k not in fields_skip_logging
        }
        if "variable" in event_name:
            extra_fields = _mask_variable_fields(
                {k: v for k, v in request_body.items()} if has_json_body else extra_fields
            )
        elif "connection" in event_name:
            extra_fields = _mask_connection_fields(
                {k: v for k, v in request_body.items()} if has_json_body else extra_fields
            )
        elif has_json_body:
            extra_fields = {**extra_fields, **masked_body_json}

        params = {
            **request.query_params,
            **request.path_params,
        }

        if has_json_body:
            params.update(masked_body_json)
        if params and "is_paused" in params:
            extra_fields["is_paused"] = params["is_paused"] == "false"

        extra_fields["method"] = request.method

        # Create log entry
        log = Log(
            event=event_name,
            task_instance=None,
            owner=user_name,
            owner_display_name=user_display,
            extra=json.dumps(extra_fields),
            task_id=params.get("task_id"),
            dag_id=params.get("dag_id"),
            run_id=params.get("run_id") or params.get("dag_run_id"),
        )

        if "logical_date" in request.query_params:
            logical_date_value = request.query_params.get("logical_date")
            if logical_date_value:
                try:
                    log.logical_date = pendulum.parse(logical_date_value, strict=False)
                except ParserError:
                    logger.exception("Failed to parse logical_date from the request: %s", logical_date_value)
            else:
                logger.warning("Logical date is missing or empty")
        session.add(log)

    return log_action