emails/sns.py (76 lines of code) (raw):

# Inspired by django-bouncy utils: # https://github.com/organizerconnect/django-bouncy/blob/master/django_bouncy/utils.py import base64 import logging from typing import Any from urllib.request import urlopen from django.conf import settings from django.core.cache import caches from django.core.exceptions import SuspiciousOperation from cryptography import x509 from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding, rsa logger = logging.getLogger("events") NOTIFICATION_HASH_FORMAT = """\ Message {Message} MessageId {MessageId} Subject {Subject} Timestamp {Timestamp} TopicArn {TopicArn} Type {Type} """ NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT = """\ Message {Message} MessageId {MessageId} Timestamp {Timestamp} TopicArn {TopicArn} Type {Type} """ SUBSCRIPTION_HASH_FORMAT = """\ Message {Message} MessageId {MessageId} SubscribeURL {SubscribeURL} Timestamp {Timestamp} Token {Token} TopicArn {TopicArn} Type {Type} """ SUPPORTED_SNS_TYPES = [ "SubscriptionConfirmation", "Notification", ] class VerificationFailed(ValueError): pass def verify_from_sns(json_body: dict[str, Any]) -> dict[str, Any]: """ Raise an exception if SNS signature verification fails. https://docs.aws.amazon.com/sns/latest/dg/sns-verify-signature-of-message.html Only supports SignatureVersion 1. SignatureVersion 2 (SHA256) was added in September 2022, and requires opt-in. """ signing_cert_url = json_body["SigningCertURL"] cert_pubkey = _get_signing_public_key(signing_cert_url) signature = base64.decodebytes(json_body["Signature"].encode()) hash_format = _get_hash_format(json_body) try: cert_pubkey.verify( signature, hash_format.format(**json_body).encode(), padding.PKCS1v15(), hashes.SHA1(), # noqa: S303 # Use of insecure hash SHA1 ) except InvalidSignature as e: raise VerificationFailed( f"Invalid signature with SigningCertURL {signing_cert_url}" ) from e return json_body def _get_hash_format(json_body: dict[str, Any]) -> str: message_type = json_body["Type"] if message_type == "Notification": if "Subject" in json_body.keys(): return NOTIFICATION_HASH_FORMAT return NOTIFICATION_WITHOUT_SUBJECT_HASH_FORMAT return SUBSCRIPTION_HASH_FORMAT def _get_signing_public_key(cert_url: str) -> rsa.RSAPublicKey: """ Download the signing certificate and return the public key. Or, return the cached public key from a previous call. """ cert_url_origin = f"https://sns.{settings.AWS_REGION}.amazonaws.com/" if not (cert_url.startswith(cert_url_origin)): raise SuspiciousOperation( f'SNS SigningCertURL "{cert_url}" did not start with "{cert_url_origin}"' ) key_cache = caches[getattr(settings, "AWS_SNS_KEY_CACHE", "default")] cache_key = f"{cert_url}:public_key" public_pem = key_cache.get(cache_key) set_cache = False if public_pem: cert_pubkey = serialization.load_pem_public_key(public_pem) else: set_cache = True response = urlopen(cert_url) # noqa: S310 (check for custom scheme) cert_pem = response.read() # Extract the first certificate in the file and confirm it's a valid # PEM certificate certs = x509.load_pem_x509_certificates(cert_pem) # A proper certificate file will contain 1 certificate if len(certs) != 1: raise VerificationFailed( f"SigningCertURL {cert_url} has {len(certs)} certificates." ) cert_pubkey = certs[0].public_key() public_pem = cert_pubkey.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) if not isinstance(cert_pubkey, rsa.RSAPublicKey): raise VerificationFailed(f"SigningCertURL {cert_url} is not an RSA key") if set_cache: key_cache.set(cache_key, public_pem) return cert_pubkey