in airflow-core/src/airflow/utils/cli.py [0:0]
def _build_metrics(func_name, namespace):
"""
Build metrics dict from function args.
It assumes that function arguments is from airflow.bin.cli module's function
and has Namespace instance where it optionally contains "dag_id", "task_id",
and "logical_date".
:param func_name: name of function
:param namespace: Namespace instance from argparse
:return: dict with metrics
"""
sub_commands_to_check_for_sensitive_fields = {"users", "connections"}
sub_commands_to_check_for_sensitive_key = {"variables"}
sensitive_fields = {"-p", "--password", "--conn-password"}
full_command = list(sys.argv)
sub_command = full_command[1] if len(full_command) > 1 else None
# For cases when value under sub_commands_to_check_for_sensitive_key have sensitive info
if sub_command in sub_commands_to_check_for_sensitive_key:
key = full_command[-2] if len(full_command) > 3 else None
if key and should_hide_value_for_key(key):
# Mask the sensitive value since key contain sensitive keyword
full_command[-1] = "*" * 8
elif sub_command in sub_commands_to_check_for_sensitive_fields:
for idx, command in enumerate(full_command):
if command in sensitive_fields:
# For cases when password is passed as "--password xyz" (with space between key and value)
full_command[idx + 1] = "*" * 8
else:
# For cases when password is passed as "--password=xyz" (with '=' between key and value)
for sensitive_field in sensitive_fields:
if command.startswith(f"{sensitive_field}="):
full_command[idx] = f"{sensitive_field}={'*' * 8}"
# handle conn-json and conn-uri separately as it requires different handling
if "--conn-json" in full_command:
import json
json_index = full_command.index("--conn-json") + 1
conn_json = json.loads(full_command[json_index])
for k in conn_json:
if k and should_hide_value_for_key(k):
conn_json[k] = "*" * 8
full_command[json_index] = json.dumps(conn_json)
if "--conn-uri" in full_command:
from urllib.parse import urlparse, urlunparse
uri_index = full_command.index("--conn-uri") + 1
conn_uri = full_command[uri_index]
parsed_uri = urlparse(conn_uri)
netloc = parsed_uri.netloc
if parsed_uri.password:
password = "*" * 8
netloc = f"{parsed_uri.username}:{password}@{parsed_uri.hostname}"
if parsed_uri.port:
netloc += f":{parsed_uri.port}"
full_command[uri_index] = urlunparse(
(
parsed_uri.scheme,
netloc,
parsed_uri.path,
parsed_uri.params,
parsed_uri.query,
parsed_uri.fragment,
)
)
metrics = {
"sub_command": func_name,
"start_datetime": timezone.utcnow(),
"full_command": f"{full_command}",
"user": getuser(),
}
if not isinstance(namespace, Namespace):
raise ValueError(
f"namespace argument should be argparse.Namespace instance, but is {type(namespace)}"
)
tmp_dic = vars(namespace)
metrics["dag_id"] = tmp_dic.get("dag_id")
metrics["task_id"] = tmp_dic.get("task_id")
metrics["logical_date"] = tmp_dic.get("logical_date")
metrics["host_name"] = socket.gethostname()
return metrics