in dags/metadb_to_secrets_manager/1.10/metadb_to_secrets_manager.py [0:0]
def write_all_to_aws_sm_fn(**kwargs):
### determine secrets manager prefixes
connections_prefix='airflow/connections'
variables_prefix='airflow/variables'
backend_kwargs = kwargs['conf'].get(section='secrets', key='backend_kwargs')
if backend_kwargs:
x = json.loads(backend_kwargs)
connections_prefix=x['connections_prefix'].strip().rstrip('/')
variables_prefix=x['variables_prefix'].strip().rstrip('/')
print("using connections_prefix=",connections_prefix,",variables_prefix=",variables_prefix,"...")
else:
print("backend_kwargs undefined--using defaults connections_prefix=",connections_prefix,",variables_prefix=",variables_prefix)
### set up SQL and AWSSM
session = settings.Session()
hook = AwsHook()
client = hook.get_client_type('secretsmanager')
### transfer connections
query = session.query(Connection)
print(query.count()," connections: ")
for curr_entry in query:
curr_id=connections_prefix+'/'+curr_entry.conn_id
curr_val=curr_entry.get_uri()
write_to_sm_fn(name=curr_id, value=curr_val, client=client)
### transfer variables
query = session.query(Variable)
print(query.count()," variables: ")
for curr_entry in query:
curr_id=variables_prefix+'/'+curr_entry.key
curr_val=curr_entry.get_val()
write_to_sm_fn(name=curr_id, value=curr_val, client=client)
return "OK"