def replace_unique_ids()

in runtime_scripts/replay.py [0:0]


    def replace_unique_ids(self, request_flow, recorded_flow):
        """
        Replace unique IDs in the response content based on the request content.

        Args:
            request_flow (http.HTTPFlow): The incoming request flow.
            recorded_flow (dict): The recorded flow to modify.
        """
        # Extract IDs from recorded request
        recorded_headers = self.convert_headers_to_dict(
            recorded_flow["request"]["headers"]
        )
        recorded_content = recorded_flow["request"]["content"]
        recorded_content_type = recorded_headers.get(b"content-type", b"").decode(
            "utf-8"
        )
        recorded_ids = self.extract_unique_ids(recorded_content, recorded_content_type)

        # Extract IDs from incoming request
        request_content = request_flow.request.content
        request_headers = request_flow.request.headers
        request_content_type = request_headers.get("Content-Type", "")
        request_ids = self.extract_unique_ids(request_content, request_content_type)

        if recorded_flow["response"] is None:
            flow_url = request_flow.request.url
            logger.warning(f"Missing response for flow recorded on: {flow_url}")
            return

        response_content = recorded_flow["response"]["content"]

        if type(response_content) != str:
            return

        # Replace IDs in response content
        current_repl_vars = set(REPLACEMENT_VARS.values())
        for name, value in recorded_ids.items():
            request_value = request_ids.get(name)
            if request_value:
                if request_value not in current_repl_vars:
                    # Store the replaced value for future reference
                    REPLACEMENT_VARS[value] = request_value

        # Force replacement of some keys
        # In certain scenarios, the app is requesting the BE for updates and these requests do not have any unique IDs
        # Thus, we need to force replacement so that the policy and report IDs match properly
        for key, value in REPLACEMENT_VARS.items():
            response_content = response_content.replace(key, value)

        # I hate ReadNewestAction endpoint, it messes EVERYTHING up.
        # In the upcoming request, get the ReportID and ensure we have a replacement for it
        # If we don't return empty
        if "ReadNewestAction?" in request_flow.request.path:
            # If nothing replaced, return
            report_id = request_ids.get("reportID")
            if report_id not in response_content:
                recorded_flow["response"]["content"] = "IGNORED"
                return

        # Replace dates in the response content
        response_content = self.replace_dates(response_content)

        # Special case to handle Pusher flows
        if (
            "AuthenticatePusher" in request_flow.request.path
            and self.pusher_client is not None
        ):
            response_content = self.handle_pusher_flows(request_flow, response_content)
        
        if "GetMapboxAccessToken" in request_flow.request.path:
            response_content = self.replace_mapbox_token(response_content)

        # Replace dynamic content
        response_content = self.replace_dynamic_content(
            request_content,
            request_content_type,
            recorded_content,
            recorded_content_type,
            response_content,
        )

        # Update the response content
        recorded_flow["response"]["content"] = response_content