scripts/scrape-manifest-ip-ranges.py (352 lines of code) (raw):

#!/usr/bin/env python3 # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. # This script provides data validation around the bloxtool CLI, which # retrieves Mozilla IP address ranges froma third party vendor. The # script runs as a recurring systemd task. import ipaddress import json import logging import subprocess import sys import time from pathlib import Path import requests from datadiff import diff from voluptuous import ( All, Any, In, Invalid as VoluptuousInvalid, Optional, Schema, truth, ) from voluptuous.humanize import validate_with_humanized_errors formatter = logging.Formatter("%(name)s %(message)s") formatter.converter = time.gmtime handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) logger = logging.getLogger("mozilla-ip-scraper") logger.addHandler(handler) def write_to_file_atomically(file_path: Path, content: str) -> None: """Writes new data to a temp file, then renames the temp file to the existing filename""" temp_file_path = file_path.with_suffix(".tmp") with temp_file_path.open(mode="w") as temp_file: temp_file.write(content) temp_file_path.rename(file_path) @truth def is_ip_address_network(value: str) -> bool: """Validates if a given value (interpreted as a str) represents an IP address network""" try: # This call will raise a ValueError if value is not a valid ip address range ipaddress.ip_network(value, strict=True) return True except ValueError: return False @truth def all_required_aws_regions_exist(prefixes: list) -> bool: """Validates that the set of all required AWS regions is a subset of all the regions in the iterable of IP networks""" required_regions = {"us-west-2"} prefixes_in_new_document = {prefix_object["region"] for prefix_object in prefixes} return required_regions <= prefixes_in_new_document @truth def all_required_gcp_regions_exist(prefixes: list) -> bool: """Validates that the set of all required GCP regions is a subset of all the regions in the iterable of IP networks""" required_regions = { "northamerica-northeast1", "us-central1", "us-west1", } prefixes_in_new_document = {prefix_object["scope"] for prefix_object in prefixes} return required_regions <= prefixes_in_new_document @truth def all_required_azure_regions_exist(values: list) -> bool: """Checks if all required Azure regions (in lowercase) are present in the 'properties.region' field for entries where the 'name' field starts with 'Storage.'.""" # Define the required Azure regions in lowercase required_regions = { "canadacentral", "centralindia", "centralus", "eastus", "eastus2", "northcentralus", "northeurope", "southindia", "westus", "westus2", "westus3", } # Extract the regions from the 'name' field where the region appears after "Storage." regions_in_new_document = { value["properties"]["region"].lower() for value in values if value["name"].startswith("Storage.") } # Validate that all required regions are present return required_regions <= regions_in_new_document def get_mozilla_office_ips(): """Entry point for the Mozilla office IP scraper Calls out to bloxtool to obtain Mozilla network information in JSON format. Validates the JSON against a known schema and atomically re-writes a file with the CIDR representations of Mozilla office IP address spaces. """ try: mozilla_ip_ranges_file = Path("/var/hg/moz-ip-ranges.txt") bloxtool_config_file = Path("/etc/mercurial/bloxtool.ini") bloxtool_command = [ "/var/hg/venv_tools/bin/bloxtool", "network", "search", "attribute", "subnet-purpose", "value", "nat-pool", "--format=json", f"--config={bloxtool_config_file}", ] bloxtool_json_schema = Schema( [ { "comment": str, "_ref": str, "network": is_ip_address_network, "network_view": str, } ], extra=False, required=True, ) # Get raw string output and convert to Python dict process_output = subprocess.run( bloxtool_command, check=True, encoding="utf-8", stdout=subprocess.PIPE ).stdout output_as_dict = json.loads(process_output) # Verify dict schema validate_with_humanized_errors(output_as_dict, bloxtool_json_schema) write_to_file_atomically( mozilla_ip_ranges_file, "\n".join(i["network"] for i in output_as_dict) ) except subprocess.CalledProcessError as cpe: logger.exception( "An error occurred while executing the bloxtool command: exit code %s" % cpe.returncode ) logger.exception("STDOUT: %s" % cpe.stdout) logger.exception("STDERR: %s" % cpe.stderr) sys.exit(1) except json.JSONDecodeError as jde: logger.exception( "An error occurred parsing the bloxtool output as JSON: %s" % jde.msg ) sys.exit(1) except VoluptuousInvalid as vi: logger.exception( "The JSON data from bloxtool does not match the required schema." ) logger.exception("Error message: %s" % vi.msg) logger.exception("Error path: %s" % vi.path) logger.exception("Exception message: %s" % vi.error_message) sys.exit(1) def get_aws_ips(): """Entry point for the AWS IP address scraper Downloads the AWS IP ranges JSON document from Amazon and verifies against a known schema. Atomically rewrites a file with the CIDR representations of AWS IP address spaces. """ try: # Grab the new data from Amazon amazon_ip_ranges_file = Path("/var/hg/aws-ip-ranges.json") ip_ranges_response = requests.get( "https://ip-ranges.amazonaws.com/ip-ranges.json" ) # Ensure 200 OK response code if ip_ranges_response.status_code != 200: sys.exit("HTTP response from Amazon was not 200 OK") # Sanity check: ensure the file is an appropriate size if len(ip_ranges_response.content) < 88000: sys.exit( "The retrieved AWS JSON document is smaller than the minimum allowable file size" ) # JSON Schema for the Amazon IP Ranges JSON document amazon_json_schema = Schema( { "syncToken": str, "createDate": str, "ipv6_prefixes": [ dict ], # If IPv6 is supported in the future, this will need to be defined # The prefixes field must meet both requirements: # 1. There must be at least one entry for each region containing CI and S3 bundles # 2. Must be a list of dicts that fit the schema below "prefixes": All( all_required_aws_regions_exist, [ { "ip_prefix": is_ip_address_network, "region": str, "service": str, "network_border_group": Optional(str), }, ], ), }, extra=False, required=True, ) # Validate dict schema output_as_dict = ip_ranges_response.json() validate_with_humanized_errors(output_as_dict, amazon_json_schema) # Sanity check: ensure the syncToken indicates an IP space change has been made # since the last recorded change. Only check if a file exists, in case of new deployments if amazon_ip_ranges_file.is_file(): file_bytes = amazon_ip_ranges_file.read_bytes() existing_document_as_dict = json.loads(file_bytes) file_diff = diff(existing_document_as_dict, output_as_dict, context=0) # Exit if the file contents are the same or the syncToken has not changed if not file_diff or int(output_as_dict["syncToken"]) <= int( existing_document_as_dict["syncToken"] ): sys.exit() else: existing_document_as_dict = ( {} ) # No existing document means whole file is the diff file_diff = diff(existing_document_as_dict, output_as_dict, context=0) write_to_file_atomically(amazon_ip_ranges_file, json.dumps(output_as_dict)) # Print the diff for collection as systemd unit output logger.info("AWS IP ranges document has been updated") logger.info(file_diff) except subprocess.CalledProcessError as cpe: logger.exception( "An error occurred when notifying about changes to the file: exit code %s" % cpe.returncode ) logger.exception("STDOUT: %s" % cpe.stdout) logger.exception("STDERR: %s" % cpe.stderr) sys.exit(1) except json.JSONDecodeError as jde: logger.exception( "An error occurred parsing the data retrieved from Amazon as JSON: %s" % jde.msg ) sys.exit(1) except VoluptuousInvalid as vi: logger.exception( "The JSON data from Amazon does not match the required schema." ) logger.exception("Error message: %s" % vi.msg) logger.exception("Error path: %s" % vi.path) logger.exception("Exception message: %s" % vi.error_message) sys.exit(1) def get_gcp_ips(): """Entry point for the GCP IP address scraper. Downloads the GCP IP ranges JSON document from Google and verifies against a known schema. Atomically rewrites a file with the CIDR representations of GCP IP address spaces. """ try: # Grab the new data from Amazon gcp_ip_ranges_file = Path("/var/hg/gcp-ip-ranges.json") ip_ranges_response = requests.get("https://www.gstatic.com/ipranges/cloud.json") # Ensure 200 OK response code if ip_ranges_response.status_code != 200: sys.exit("HTTP response from Google was not 200 OK") # Sanity check: ensure the file is an appropriate size if len(ip_ranges_response.content) < 8000: sys.exit( "The retrieved GCP JSON document is smaller than the minimum allowable file size" ) # JSON Schema for the Google IP Ranges JSON document google_json_schema = Schema( { "syncToken": str, "creationTime": str, # The prefixes field must meet both requirements: # 1. There must be at least one entry for each region containing CI and S3 bundles # 2. Must be a list of dicts that fit the schema below "prefixes": All( all_required_gcp_regions_exist, [ { # One of these tags must be present, and it's value must be an IP block. In({"ipv4Prefix", "ipv6Prefix"}): is_ip_address_network, "scope": str, "service": str, }, ], ), }, extra=False, required=True, ) # Validate dict schema output_as_dict = ip_ranges_response.json() validate_with_humanized_errors(output_as_dict, google_json_schema) # Sanity check: ensure the syncToken indicates an IP space change has been made # since the last recorded change. Only check if a file exists, in case of new deployments if gcp_ip_ranges_file.is_file(): file_bytes = gcp_ip_ranges_file.read_bytes() existing_document_as_dict = json.loads(file_bytes) file_diff = diff(existing_document_as_dict, output_as_dict, context=0) # Exit if the file contents are the same or the syncToken has not changed if not file_diff or int(output_as_dict["syncToken"]) <= int( existing_document_as_dict["syncToken"] ): sys.exit() else: existing_document_as_dict = ( {} ) # No existing document means whole file is the diff file_diff = diff(existing_document_as_dict, output_as_dict, context=0) write_to_file_atomically(gcp_ip_ranges_file, json.dumps(output_as_dict)) # Print the diff for collection as systemd unit output logger.info("GCP IP ranges document has been updated") logger.info(file_diff) except subprocess.CalledProcessError as cpe: logger.exception( "An error occurred when notifying about changes to the file: exit code %s" % cpe.returncode ) logger.exception("STDOUT: %s" % cpe.stdout) logger.exception("STDERR: %s" % cpe.stderr) sys.exit(1) except json.JSONDecodeError as jde: logger.exception( "An error occurred parsing the data retrieved from GCP as JSON: %s" % jde.msg ) sys.exit(1) except VoluptuousInvalid as vi: logger.exception("The JSON data from GCP does not match the required schema.") logger.exception("Error message: %s" % vi.msg) logger.exception("Error path: %s" % vi.path) logger.exception("Exception message: %s" % vi.error_message) sys.exit(1) def get_azure_ips(): """Entry point for the Azure IP address scraper. Downloads the Azure IP ranges JSON document and verifies against a known schema. Atomically rewrites a file with the CIDR representations of Azure IP address spaces. """ try: # Define the file path for storing Azure IP ranges azure_ip_ranges_file = Path("/var/hg/azure-ip-ranges.json") # Fetch the latest Azure IP ranges from the URL ip_ranges_response = requests.get( "https://raw.githubusercontent.com/mozilla-platform-ops/azure-public-ip-ranges/refs/heads/main/az_ips.json" ) # Ensure the HTTP response is successful if ip_ranges_response.status_code != 200: sys.exit("HTTP response from Azure IP ranges was not 200 OK") # Sanity check: ensure the file is not unexpectedly small if len(ip_ranges_response.content) < 8000: sys.exit( "The retrieved Azure JSON document is smaller than the minimum allowable file size" ) # Define the schema for validating the Azure IP ranges document azure_json_schema = Schema( { "changeNumber": int, "cloud": str, "values": All( all_required_azure_regions_exist, # Validate required Azure regions [ { "name": str, # Ensure this contains "Storage.$REGION" "id": str, "properties": { "changeNumber": int, "region": str, "regionId": int, "platform": str, "systemService": str, "addressPrefixes": [ is_ip_address_network ], # Validate each IP network "networkFeatures": Any(list, None), }, } ], ), }, extra=False, required=True, ) # Parse the Azure IP ranges JSON data output_as_dict = ip_ranges_response.json() # Validate the parsed data against the schema validate_with_humanized_errors(output_as_dict, azure_json_schema) # Sanity check: if the file exists, ensure there's a change in content if azure_ip_ranges_file.is_file(): existing_file_bytes = azure_ip_ranges_file.read_bytes() existing_document_as_dict = json.loads(existing_file_bytes) file_diff = diff(existing_document_as_dict, output_as_dict, context=0) # Exit if the file contents are unchanged if not file_diff: sys.exit() # If no existing file, everything is considered new else: existing_document_as_dict = {} file_diff = diff(existing_document_as_dict, output_as_dict, context=0) # Write the new data to the file atomically write_to_file_atomically(azure_ip_ranges_file, json.dumps(output_as_dict)) # Log the changes to the systemd unit output logger.info("Azure IP ranges document has been updated") logger.info(file_diff) except requests.exceptions.RequestException as re: logger.exception(f"An error occurred while retrieving Azure IP ranges: {re}") sys.exit(1) except json.JSONDecodeError as jde: logger.exception( "An error occurred parsing the data retrieved from Azure as JSON: %s" % jde.msg ) sys.exit(1) except VoluptuousInvalid as vi: logger.exception("The JSON data from Azure does not match the required schema.") logger.exception("Error message: %s" % vi.msg) logger.exception("Error path: %s" % vi.path) logger.exception("Exception message: %s" % vi.error_message) sys.exit(1) # Register possible commands COMMANDS = { "aws": get_aws_ips, "azure": get_azure_ips, "gcloud": get_gcp_ips, "moz-offices": get_mozilla_office_ips, } if __name__ == "__main__": if len(sys.argv) != 2 or sys.argv[1] not in COMMANDS: sys.exit( "usage: {executable} <{possible_commands}>".format( executable=sys.argv[0], possible_commands=" | ".join(COMMANDS.keys()) ) ) COMMANDS[sys.argv[1]]() sys.exit()