in airflow-core/src/airflow/cli/commands/connection_command.py [0:0]
def connections_add(args):
"""Add new connection."""
has_uri = bool(args.conn_uri)
has_json = bool(args.conn_json)
has_type = bool(args.conn_type)
# Validate connection-id
try:
helpers.validate_key(args.conn_id, max_length=200)
except Exception as e:
raise SystemExit(f"Could not create connection. {e}")
if not has_type and not (has_json or has_uri):
raise SystemExit("Must supply either conn-uri or conn-json if not supplying conn-type")
if has_json and has_uri:
raise SystemExit("Cannot supply both conn-uri and conn-json")
if has_type and args.conn_type not in _get_connection_types():
warnings.warn(
f"The type provided to --conn-type is invalid: {args.conn_type}", UserWarning, stacklevel=4
)
warnings.warn(
f"Supported --conn-types are:{_get_connection_types()}."
"Hence overriding the conn-type with generic",
UserWarning,
stacklevel=4,
)
args.conn_type = "generic"
if has_uri or has_json:
invalid_args = []
if has_uri and not _valid_uri(args.conn_uri):
raise SystemExit(f"The URI provided to --conn-uri is invalid: {args.conn_uri}")
for arg in alternative_conn_specs:
if getattr(args, arg) is not None:
invalid_args.append(arg)
if has_json and args.conn_extra:
invalid_args.append("--conn-extra")
if invalid_args:
raise SystemExit(
"The following args are not compatible with "
f"the --conn-{'uri' if has_uri else 'json'} flag: {invalid_args!r}"
)
if args.conn_uri:
new_conn = Connection(conn_id=args.conn_id, description=args.conn_description, uri=args.conn_uri)
if args.conn_extra is not None:
new_conn.set_extra(args.conn_extra)
elif args.conn_json:
new_conn = Connection.from_json(conn_id=args.conn_id, value=args.conn_json)
if not new_conn.conn_type:
raise SystemExit("conn-json is invalid; must supply conn-type")
else:
new_conn = Connection(
conn_id=args.conn_id,
conn_type=args.conn_type,
description=args.conn_description,
host=args.conn_host,
login=args.conn_login,
password=args.conn_password,
schema=args.conn_schema,
port=args.conn_port,
)
if args.conn_extra is not None:
new_conn.set_extra(args.conn_extra)
with create_session() as session:
if not session.scalar(select(Connection).where(Connection.conn_id == new_conn.conn_id).limit(1)):
session.add(new_conn)
msg = "Successfully added `conn_id`={conn_id} : {uri}"
msg = msg.format(
conn_id=new_conn.conn_id,
uri=args.conn_uri
or urlunsplit(
(
new_conn.conn_type,
f"{new_conn.login or ''}:{'******' if new_conn.password else ''}"
f"@{new_conn.host or ''}:{new_conn.port or ''}",
new_conn.schema or "",
"",
"",
)
),
)
print(msg)
else:
msg = f"A connection with `conn_id`={new_conn.conn_id} already exists."
raise SystemExit(msg)