def connections_add()

in airflow-core/src/airflow/cli/commands/connection_command.py [0:0]


def connections_add(args):
    """Add new connection."""
    has_uri = bool(args.conn_uri)
    has_json = bool(args.conn_json)
    has_type = bool(args.conn_type)

    # Validate connection-id
    try:
        helpers.validate_key(args.conn_id, max_length=200)
    except Exception as e:
        raise SystemExit(f"Could not create connection. {e}")

    if not has_type and not (has_json or has_uri):
        raise SystemExit("Must supply either conn-uri or conn-json if not supplying conn-type")

    if has_json and has_uri:
        raise SystemExit("Cannot supply both conn-uri and conn-json")

    if has_type and args.conn_type not in _get_connection_types():
        warnings.warn(
            f"The type provided to --conn-type is invalid: {args.conn_type}", UserWarning, stacklevel=4
        )
        warnings.warn(
            f"Supported --conn-types are:{_get_connection_types()}."
            "Hence overriding the conn-type with generic",
            UserWarning,
            stacklevel=4,
        )
        args.conn_type = "generic"

    if has_uri or has_json:
        invalid_args = []
        if has_uri and not _valid_uri(args.conn_uri):
            raise SystemExit(f"The URI provided to --conn-uri is invalid: {args.conn_uri}")

        for arg in alternative_conn_specs:
            if getattr(args, arg) is not None:
                invalid_args.append(arg)

        if has_json and args.conn_extra:
            invalid_args.append("--conn-extra")

        if invalid_args:
            raise SystemExit(
                "The following args are not compatible with "
                f"the --conn-{'uri' if has_uri else 'json'} flag: {invalid_args!r}"
            )

    if args.conn_uri:
        new_conn = Connection(conn_id=args.conn_id, description=args.conn_description, uri=args.conn_uri)
        if args.conn_extra is not None:
            new_conn.set_extra(args.conn_extra)
    elif args.conn_json:
        new_conn = Connection.from_json(conn_id=args.conn_id, value=args.conn_json)
        if not new_conn.conn_type:
            raise SystemExit("conn-json is invalid; must supply conn-type")
    else:
        new_conn = Connection(
            conn_id=args.conn_id,
            conn_type=args.conn_type,
            description=args.conn_description,
            host=args.conn_host,
            login=args.conn_login,
            password=args.conn_password,
            schema=args.conn_schema,
            port=args.conn_port,
        )
        if args.conn_extra is not None:
            new_conn.set_extra(args.conn_extra)

    with create_session() as session:
        if not session.scalar(select(Connection).where(Connection.conn_id == new_conn.conn_id).limit(1)):
            session.add(new_conn)
            msg = "Successfully added `conn_id`={conn_id} : {uri}"
            msg = msg.format(
                conn_id=new_conn.conn_id,
                uri=args.conn_uri
                or urlunsplit(
                    (
                        new_conn.conn_type,
                        f"{new_conn.login or ''}:{'******' if new_conn.password else ''}"
                        f"@{new_conn.host or ''}:{new_conn.port or ''}",
                        new_conn.schema or "",
                        "",
                        "",
                    )
                ),
            )
            print(msg)
        else:
            msg = f"A connection with `conn_id`={new_conn.conn_id} already exists."
            raise SystemExit(msg)