def get_aws_ips()

in scripts/scrape-manifest-ip-ranges.py [0:0]


def get_aws_ips():
    """Entry point for the AWS IP address scraper

    Downloads the AWS IP ranges JSON document from Amazon and verifies against a
    known schema. Atomically rewrites a file with the CIDR representations of
    AWS IP address spaces.
    """
    try:
        # Grab the new data from Amazon
        amazon_ip_ranges_file = Path("/var/hg/aws-ip-ranges.json")
        ip_ranges_response = requests.get(
            "https://ip-ranges.amazonaws.com/ip-ranges.json"
        )

        # Ensure 200 OK response code
        if ip_ranges_response.status_code != 200:
            sys.exit("HTTP response from Amazon was not 200 OK")

        # Sanity check: ensure the file is an appropriate size
        if len(ip_ranges_response.content) < 88000:
            sys.exit(
                "The retrieved AWS JSON document is smaller than the minimum allowable file size"
            )

        # JSON Schema for the Amazon IP Ranges JSON document
        amazon_json_schema = Schema(
            {
                "syncToken": str,
                "createDate": str,
                "ipv6_prefixes": [
                    dict
                ],  # If IPv6 is supported in the future, this will need to be defined
                # The prefixes field must meet both requirements:
                # 1. There must be at least one entry for each region containing CI and S3 bundles
                # 2. Must be a list of dicts that fit the schema below
                "prefixes": All(
                    all_required_aws_regions_exist,
                    [
                        {
                            "ip_prefix": is_ip_address_network,
                            "region": str,
                            "service": str,
                            "network_border_group": Optional(str),
                        },
                    ],
                ),
            },
            extra=False,
            required=True,
        )

        # Validate dict schema
        output_as_dict = ip_ranges_response.json()
        validate_with_humanized_errors(output_as_dict, amazon_json_schema)

        # Sanity check: ensure the syncToken indicates an IP space change has been made
        # since the last recorded change. Only check if a file exists, in case of new deployments
        if amazon_ip_ranges_file.is_file():
            file_bytes = amazon_ip_ranges_file.read_bytes()
            existing_document_as_dict = json.loads(file_bytes)

            file_diff = diff(existing_document_as_dict, output_as_dict, context=0)

            # Exit if the file contents are the same or the syncToken has not changed
            if not file_diff or int(output_as_dict["syncToken"]) <= int(
                existing_document_as_dict["syncToken"]
            ):
                sys.exit()

        else:
            existing_document_as_dict = (
                {}
            )  # No existing document means whole file is the diff
            file_diff = diff(existing_document_as_dict, output_as_dict, context=0)

        write_to_file_atomically(amazon_ip_ranges_file, json.dumps(output_as_dict))

        # Print the diff for collection as systemd unit output
        logger.info("AWS IP ranges document has been updated")
        logger.info(file_diff)

    except subprocess.CalledProcessError as cpe:
        logger.exception(
            "An error occurred when notifying about changes to the file: exit code %s"
            % cpe.returncode
        )
        logger.exception("STDOUT: %s" % cpe.stdout)
        logger.exception("STDERR: %s" % cpe.stderr)
        sys.exit(1)

    except json.JSONDecodeError as jde:
        logger.exception(
            "An error occurred parsing the data retrieved from Amazon as JSON: %s"
            % jde.msg
        )
        sys.exit(1)

    except VoluptuousInvalid as vi:
        logger.exception(
            "The JSON data from Amazon does not match the required schema."
        )
        logger.exception("Error message: %s" % vi.msg)
        logger.exception("Error path: %s" % vi.path)
        logger.exception("Exception message: %s" % vi.error_message)
        sys.exit(1)