in sdk/aws_orbit_sdk/database.py [0:0]
def _get_redshift_functions(self) -> Dict[str, Dict[str, str]]:
"""
Returns a dictionary of all redshift function names with their parameters/configuration
using Lambda.list_functions().
"""
lambda_client = boto3.client("lambda")
props = get_properties()
env_name = props["AWS_ORBIT_ENV"]
team_space = props["AWS_ORBIT_TEAM_SPACE"]
namespace = f"{env_name}-{team_space}"
funcs = []
while True:
response = lambda_client.list_functions()
token = response["NextMarker"] if "NextMarker" in response else None
for func in response["Functions"]:
if (
namespace in func["FunctionName"]
and "Environment" in func
and "Variables" in func["Environment"]
and "RedshiftClusterParameterGroup" in func["Environment"]["Variables"]
and "StartRedshift" in func["FunctionName"]
):
funcs.append(func)
if token is None:
break
func_descs = {}
for f in funcs:
user_name = f["FunctionName"][f["FunctionName"].index("StartRedshift") + len("StartRedshift-") :]
func_desc = f["Environment"]["Variables"]
del func_desc["RedshiftClusterParameterGroup"]
del func_desc["RedshiftClusterSubnetGroup"]
del func_desc["RedshiftClusterSecurityGroup"]
del func_desc[ORBIT_ENV]
del func_desc[AWS_ORBIT_TEAM_SPACE]
del func_desc["SecretId"]
del func_desc["PortNumber"]
del func_desc["Role"]
func_descs[user_name] = func_desc
return func_descs