def _get_redshift_functions()

in sdk/aws_orbit_sdk/database.py [0:0]


    def _get_redshift_functions(self) -> Dict[str, Dict[str, str]]:
        """
        Returns a dictionary of all redshift function names  with their parameters/configuration
        using Lambda.list_functions().
        """

        lambda_client = boto3.client("lambda")
        props = get_properties()
        env_name = props["AWS_ORBIT_ENV"]
        team_space = props["AWS_ORBIT_TEAM_SPACE"]
        namespace = f"{env_name}-{team_space}"
        funcs = []
        while True:
            response = lambda_client.list_functions()
            token = response["NextMarker"] if "NextMarker" in response else None
            for func in response["Functions"]:
                if (
                    namespace in func["FunctionName"]
                    and "Environment" in func
                    and "Variables" in func["Environment"]
                    and "RedshiftClusterParameterGroup" in func["Environment"]["Variables"]
                    and "StartRedshift" in func["FunctionName"]
                ):
                    funcs.append(func)
            if token is None:
                break

        func_descs = {}

        for f in funcs:
            user_name = f["FunctionName"][f["FunctionName"].index("StartRedshift") + len("StartRedshift-") :]
            func_desc = f["Environment"]["Variables"]
            del func_desc["RedshiftClusterParameterGroup"]
            del func_desc["RedshiftClusterSubnetGroup"]
            del func_desc["RedshiftClusterSecurityGroup"]
            del func_desc[ORBIT_ENV]
            del func_desc[AWS_ORBIT_TEAM_SPACE]
            del func_desc["SecretId"]
            del func_desc["PortNumber"]
            del func_desc["Role"]
            func_descs[user_name] = func_desc

        return func_descs