def _build_metrics()

in airflow-core/src/airflow/utils/cli.py [0:0]


def _build_metrics(func_name, namespace):
    """
    Build metrics dict from function args.

    It assumes that function arguments is from airflow.bin.cli module's function
    and has Namespace instance where it optionally contains "dag_id", "task_id",
    and "logical_date".

    :param func_name: name of function
    :param namespace: Namespace instance from argparse
    :return: dict with metrics
    """
    sub_commands_to_check_for_sensitive_fields = {"users", "connections"}
    sub_commands_to_check_for_sensitive_key = {"variables"}
    sensitive_fields = {"-p", "--password", "--conn-password"}
    full_command = list(sys.argv)
    sub_command = full_command[1] if len(full_command) > 1 else None
    # For cases when value under sub_commands_to_check_for_sensitive_key have sensitive info
    if sub_command in sub_commands_to_check_for_sensitive_key:
        key = full_command[-2] if len(full_command) > 3 else None
        if key and should_hide_value_for_key(key):
            # Mask the sensitive value since key contain sensitive keyword
            full_command[-1] = "*" * 8
    elif sub_command in sub_commands_to_check_for_sensitive_fields:
        for idx, command in enumerate(full_command):
            if command in sensitive_fields:
                # For cases when password is passed as "--password xyz" (with space between key and value)
                full_command[idx + 1] = "*" * 8
            else:
                # For cases when password is passed as "--password=xyz" (with '=' between key and value)
                for sensitive_field in sensitive_fields:
                    if command.startswith(f"{sensitive_field}="):
                        full_command[idx] = f"{sensitive_field}={'*' * 8}"

    # handle conn-json and conn-uri separately as it requires different handling
    if "--conn-json" in full_command:
        import json

        json_index = full_command.index("--conn-json") + 1
        conn_json = json.loads(full_command[json_index])
        for k in conn_json:
            if k and should_hide_value_for_key(k):
                conn_json[k] = "*" * 8
        full_command[json_index] = json.dumps(conn_json)

    if "--conn-uri" in full_command:
        from urllib.parse import urlparse, urlunparse

        uri_index = full_command.index("--conn-uri") + 1
        conn_uri = full_command[uri_index]
        parsed_uri = urlparse(conn_uri)
        netloc = parsed_uri.netloc
        if parsed_uri.password:
            password = "*" * 8
            netloc = f"{parsed_uri.username}:{password}@{parsed_uri.hostname}"
            if parsed_uri.port:
                netloc += f":{parsed_uri.port}"

        full_command[uri_index] = urlunparse(
            (
                parsed_uri.scheme,
                netloc,
                parsed_uri.path,
                parsed_uri.params,
                parsed_uri.query,
                parsed_uri.fragment,
            )
        )

    metrics = {
        "sub_command": func_name,
        "start_datetime": timezone.utcnow(),
        "full_command": f"{full_command}",
        "user": getuser(),
    }

    if not isinstance(namespace, Namespace):
        raise ValueError(
            f"namespace argument should be argparse.Namespace instance, but is {type(namespace)}"
        )
    tmp_dic = vars(namespace)
    metrics["dag_id"] = tmp_dic.get("dag_id")
    metrics["task_id"] = tmp_dic.get("task_id")
    metrics["logical_date"] = tmp_dic.get("logical_date")
    metrics["host_name"] = socket.gethostname()

    return metrics