in scripts/scrape-manifest-ip-ranges.py [0:0]
def get_azure_ips():
"""Entry point for the Azure IP address scraper.
Downloads the Azure IP ranges JSON document and verifies against a
known schema. Atomically rewrites a file with the CIDR representations of
Azure IP address spaces.
"""
try:
# Define the file path for storing Azure IP ranges
azure_ip_ranges_file = Path("/var/hg/azure-ip-ranges.json")
# Fetch the latest Azure IP ranges from the URL
ip_ranges_response = requests.get(
"https://raw.githubusercontent.com/mozilla-platform-ops/azure-public-ip-ranges/refs/heads/main/az_ips.json"
)
# Ensure the HTTP response is successful
if ip_ranges_response.status_code != 200:
sys.exit("HTTP response from Azure IP ranges was not 200 OK")
# Sanity check: ensure the file is not unexpectedly small
if len(ip_ranges_response.content) < 8000:
sys.exit(
"The retrieved Azure JSON document is smaller than the minimum allowable file size"
)
# Define the schema for validating the Azure IP ranges document
azure_json_schema = Schema(
{
"changeNumber": int,
"cloud": str,
"values": All(
all_required_azure_regions_exist, # Validate required Azure regions
[
{
"name": str, # Ensure this contains "Storage.$REGION"
"id": str,
"properties": {
"changeNumber": int,
"region": str,
"regionId": int,
"platform": str,
"systemService": str,
"addressPrefixes": [
is_ip_address_network
], # Validate each IP network
"networkFeatures": Any(list, None),
},
}
],
),
},
extra=False,
required=True,
)
# Parse the Azure IP ranges JSON data
output_as_dict = ip_ranges_response.json()
# Validate the parsed data against the schema
validate_with_humanized_errors(output_as_dict, azure_json_schema)
# Sanity check: if the file exists, ensure there's a change in content
if azure_ip_ranges_file.is_file():
existing_file_bytes = azure_ip_ranges_file.read_bytes()
existing_document_as_dict = json.loads(existing_file_bytes)
file_diff = diff(existing_document_as_dict, output_as_dict, context=0)
# Exit if the file contents are unchanged
if not file_diff:
sys.exit()
# If no existing file, everything is considered new
else:
existing_document_as_dict = {}
file_diff = diff(existing_document_as_dict, output_as_dict, context=0)
# Write the new data to the file atomically
write_to_file_atomically(azure_ip_ranges_file, json.dumps(output_as_dict))
# Log the changes to the systemd unit output
logger.info("Azure IP ranges document has been updated")
logger.info(file_diff)
except requests.exceptions.RequestException as re:
logger.exception(f"An error occurred while retrieving Azure IP ranges: {re}")
sys.exit(1)
except json.JSONDecodeError as jde:
logger.exception(
"An error occurred parsing the data retrieved from Azure as JSON: %s"
% jde.msg
)
sys.exit(1)
except VoluptuousInvalid as vi:
logger.exception("The JSON data from Azure does not match the required schema.")
logger.exception("Error message: %s" % vi.msg)
logger.exception("Error path: %s" % vi.path)
logger.exception("Exception message: %s" % vi.error_message)
sys.exit(1)