def main()

in helpers/lambda_env_update.py [0:0]


def main(event: dict, context: object) -> dict:

    import logging as log
    import cfnresponse
    import boto3
    from botocore.exceptions import ClientError

    log.getLogger().setLevel(log.INFO)

    # This needs to change if there are to be multiple resources
    # in the same stack
    physical_id = event["ResourceProperties"]["PhysicalId"]

    # set clients
    la_client = boto3.client("lambda")
    cf_client = boto3.client("cloudfront")
    s3_client = boto3.client("s3")
    es_client = boto3.client("es")

    try:
        log.info(f"Input event: {event}")

        # Check if this is a Create and we're failing Creates
        if event["RequestType"] == "Create" and event["ResourceProperties"].get(
            "FailCreate", False
        ):
            raise RuntimeError("Create failure requested")

        # Update the lambda env if event is create
        if event["RequestType"] in ["Create", "Update"]:
            # get the functions name
            functions_list = la_client.list_functions()["Functions"]
            api_function = [
                fcn["FunctionName"]
                for fcn in functions_list
                if fcn["Description"] == "kibana api gateway lambda"
            ][0]

            # get the cloudfront domain name
            distributions_list = cf_client.list_distributions()["DistributionList"][
                "Items"
            ]
            cloudfront_domain = [
                dist["DomainName"]
                for dist in distributions_list
                if "elkk-kibana" in dist["Origins"]["Items"][0]["DomainName"]
            ][0]

            # get the s3 bucket name
            s3_bucket_list = s3_client.list_buckets()
            for bkt in s3_bucket_list["Buckets"]:
                try:
                    bkt_tags = s3_client.get_bucket_tagging(Bucket=bkt["Name"])[
                        "TagSet"
                    ]
                    for keypairs in bkt_tags:
                        if (
                            keypairs["Key"] == "aws:cloudformation:stack-name"
                            and keypairs["Value"] == "elkk-kibana"
                        ):
                            kibana_bucket_name = bkt["Name"]
                except ClientError:
                    pass

            # get the elastic endpoint details
            es_domains = es_client.list_domain_names()
            try:
                es_domain = [
                    dom["DomainName"]
                    for dom in es_domains["DomainNames"]
                    if "elkk-" in dom["DomainName"]
                ][0]
            except IndexError:
                return ""

            es_endpoint = es_client.describe_elasticsearch_domain(DomainName=es_domain)
            elastic_endpoint = es_endpoint["DomainStatus"]["Endpoints"]["vpc"]

            # update the functions env
            update_env = la_client.update_function_configuration(
                FunctionName=api_function,
                Environment={
                    "Variables": {
                        "AES_DOMAIN_ENDPOINT": f"https://{elastic_endpoint}",
                        "KIBANA_BUCKET": kibana_bucket_name,
                        "S3_MAX_AGE": "2629746",
                        "LOG_LEVEL": "warning",
                        "CLOUDFRONT_CACHE_URL": f"https://{cloudfront_domain}/bucket_cached",
                    }
                },
            )
        # do some reporting
        attributes = {"Response": update_env}
        cfnresponse.send(event, context, cfnresponse.SUCCESS, attributes, physical_id)

    except Exception as e:
        log.exception(e)
        # cfnresponse's error message is always "see CloudWatch"
        cfnresponse.send(event, context, cfnresponse.FAILED, {}, physical_id)