csp/middleware.py (95 lines of code) (raw):
from __future__ import annotations
import base64
import http.client as http_client
import os
import warnings
from dataclasses import asdict, dataclass
from functools import partial
from typing import TYPE_CHECKING
from django.conf import settings
from django.utils.deprecation import MiddlewareMixin
from django.utils.functional import SimpleLazyObject, empty
from csp.constants import HEADER, HEADER_REPORT_ONLY
from csp.exceptions import CSPNonceError
from csp.utils import DIRECTIVES_T, build_policy
if TYPE_CHECKING:
from django.http import HttpRequest, HttpResponseBase
@dataclass
class PolicyParts:
# A dataclass is used rather than a namedtuple so that the attributes are mutable
config: DIRECTIVES_T | None = None
update: DIRECTIVES_T | None = None
replace: DIRECTIVES_T | None = None
nonce: str | None = None
class CheckableLazyObject(SimpleLazyObject):
"""A SimpleLazyObject where bool(obj) returns True if no longer lazy"""
def __bool__(self) -> bool:
"""
If the wrapped function has been evaluated, return True.
If the wrapped function has not been evalated, return False.
"""
return getattr(self, "_wrapped") is not empty
class CSPMiddleware(MiddlewareMixin):
"""
Implements the Content-Security-Policy response header, which
conforming user-agents can use to restrict the permitted sources
of various content.
See http://www.w3.org/TR/CSP/
Can be customised by subclassing and extending the get_policy_parts method.
"""
def _make_nonce(self, request: HttpRequest) -> str:
# Ensure that any subsequent calls to request.csp_nonce return the same value
stored_nonce = getattr(request, "_csp_nonce", None)
if isinstance(stored_nonce, str):
return stored_nonce
nonce = base64.b64encode(os.urandom(16)).decode("ascii")
setattr(request, "_csp_nonce", nonce)
return nonce
@staticmethod
def _csp_nonce_post_response() -> None:
raise CSPNonceError(
"The 'csp_nonce' attribute is not available after the CSP header has been written. Consider adjusting your MIDDLEWARE order."
)
def process_request(self, request: HttpRequest) -> None:
nonce = partial(self._make_nonce, request)
setattr(request, "csp_nonce", CheckableLazyObject(nonce))
def process_response(self, request: HttpRequest, response: HttpResponseBase) -> HttpResponseBase:
# Check for debug view
exempted_debug_codes = (
http_client.INTERNAL_SERVER_ERROR,
http_client.NOT_FOUND,
)
if response.status_code in exempted_debug_codes and settings.DEBUG:
return response
policy_parts = self.get_policy_parts(request=request, response=response)
csp = build_policy(**asdict(policy_parts))
if csp:
# Only set header if not already set and not an excluded prefix and not exempted.
is_not_exempt = getattr(response, "_csp_exempt", False) is False
no_header = HEADER not in response
policy = getattr(settings, "CONTENT_SECURITY_POLICY", None) or {}
prefixes = policy.get("EXCLUDE_URL_PREFIXES", None) or ()
is_not_excluded = not request.path_info.startswith(tuple(prefixes))
if no_header and is_not_exempt and is_not_excluded:
response[HEADER] = csp
policy_parts_ro = self.get_policy_parts(request=request, response=response, report_only=True)
csp_ro = build_policy(**asdict(policy_parts_ro), report_only=True)
if csp_ro:
# Only set header if not already set and not an excluded prefix and not exempted.
is_not_exempt = getattr(response, "_csp_exempt_ro", False) is False
no_header = HEADER_REPORT_ONLY not in response
policy = getattr(settings, "CONTENT_SECURITY_POLICY_REPORT_ONLY", None) or {}
prefixes = policy.get("EXCLUDE_URL_PREFIXES", None) or ()
is_not_excluded = not request.path_info.startswith(tuple(prefixes))
if no_header and is_not_exempt and is_not_excluded:
response[HEADER_REPORT_ONLY] = csp_ro
# Once we've written the header, accessing the `request.csp_nonce` will no longer trigger
# the nonce to be added to the header. Instead we throw an error here to catch this since
# this has security implications.
if getattr(request, "_csp_nonce", None) is None:
setattr(request, "csp_nonce", CheckableLazyObject(self._csp_nonce_post_response))
return response
def build_policy(self, request: HttpRequest, response: HttpResponseBase) -> str:
warnings.warn("deprecated in favor of get_policy_parts", DeprecationWarning)
policy_parts = self.get_policy_parts(request=request, response=response, report_only=False)
return build_policy(**asdict(policy_parts))
def build_policy_ro(self, request: HttpRequest, response: HttpResponseBase) -> str:
warnings.warn("deprecated in favor of get_policy_parts", DeprecationWarning)
policy_parts_ro = self.get_policy_parts(request=request, response=response, report_only=True)
return build_policy(**asdict(policy_parts_ro), report_only=True)
def get_policy_parts(
self,
request: HttpRequest,
response: HttpResponseBase,
report_only: bool = False,
) -> PolicyParts:
if report_only:
config = getattr(response, "_csp_config_ro", None)
update = getattr(response, "_csp_update_ro", None)
replace = getattr(response, "_csp_replace_ro", None)
else:
config = getattr(response, "_csp_config", None)
update = getattr(response, "_csp_update", None)
replace = getattr(response, "_csp_replace", None)
nonce = getattr(request, "_csp_nonce", None)
return PolicyParts(config, update, replace, nonce)