in airflow-core/src/airflow/api_fastapi/logging/decorators.py [0:0]
def action_logging(event: str | None = None):
async def log_action(
request: Request,
session: SessionDep,
user: Annotated[BaseUser, Depends(get_user_with_exception_handling)],
):
"""Log user actions."""
event_name = event or request.scope["endpoint"].__name__
if not user:
user_name = "anonymous"
user_display = ""
else:
user_name = user.get_name()
user_display = user.get_name()
has_json_body = "application/json" in request.headers.get("content-type", "") and await request.body()
if has_json_body:
request_body = await request.json()
masked_body_json = {k: secrets_masker.redact(v, k) for k, v in request_body.items()}
else:
request_body = {}
masked_body_json = {}
fields_skip_logging = {
"csrf_token",
"_csrf_token",
"is_paused",
"dag_id",
"task_id",
"dag_run_id",
"run_id",
"logical_date",
}
extra_fields = {
k: secrets_masker.redact(v, k)
for k, v in itertools.chain(request.query_params.items(), request.path_params.items())
if k not in fields_skip_logging
}
if "variable" in event_name:
extra_fields = _mask_variable_fields(
{k: v for k, v in request_body.items()} if has_json_body else extra_fields
)
elif "connection" in event_name:
extra_fields = _mask_connection_fields(
{k: v for k, v in request_body.items()} if has_json_body else extra_fields
)
elif has_json_body:
extra_fields = {**extra_fields, **masked_body_json}
params = {
**request.query_params,
**request.path_params,
}
if has_json_body:
params.update(masked_body_json)
if params and "is_paused" in params:
extra_fields["is_paused"] = params["is_paused"] == "false"
extra_fields["method"] = request.method
# Create log entry
log = Log(
event=event_name,
task_instance=None,
owner=user_name,
owner_display_name=user_display,
extra=json.dumps(extra_fields),
task_id=params.get("task_id"),
dag_id=params.get("dag_id"),
run_id=params.get("run_id") or params.get("dag_run_id"),
)
if "logical_date" in request.query_params:
logical_date_value = request.query_params.get("logical_date")
if logical_date_value:
try:
log.logical_date = pendulum.parse(logical_date_value, strict=False)
except ParserError:
logger.exception("Failed to parse logical_date from the request: %s", logical_date_value)
else:
logger.warning("Logical date is missing or empty")
session.add(log)
return log_action