def process_flows()

in runtime_scripts/replay.py [0:0]


    def process_flows(self):
        """
        Process the recorded flows, separating them into filtered and other flows.
        """
        # First, decompress the content of the flows
        for flow in self.recorded_flows:
            if not flow["response"]:
                continue
            flow["response"]["content"] = self.decompress_gzip(
                flow["response"]["content"], flow["response"]["headers"]
            )
            if isinstance(flow["response"]["content"], str):
                flow["response"]["content"] = (
                    flow["response"]["content"]
                    .encode("utf-16", "surrogatepass")
                    .decode("utf-16")
                )
                flow["response"]["content"] = flow["response"]["content"].replace(
                    r"\/", "/"
                )

        for flow in self.recorded_flows:
            if "method" not in flow["request"]:
                continue
            elif flow["websocket"] and any(
                host in flow["server_conn"]["sni"] for host in WEBSOCKET_HOSTS
            ):
                self.ws_flows.append(flow)
            elif flow["server_conn"]["sni"] not in EXPENSIFY_HOSTS:
                self.other_flows.append(flow)
            elif any(
                flow["request"]["path"].startswith(path) for path in UNNECESSARY_PATHS
            ):
                self.other_flows.append(flow)
            elif any(
                flow["request"]["path"].startswith(path)
                for path in CHAT_ATTACHMENTS_PATHS
            ):
                self.attachment_flows.append(flow)
            else:
                self.filtered_flows.append({"flow": flow, "marked": False})
                if any(
                    flow["request"]["path"].startswith(path)
                    for path in PUSHER_AUTHENTICATION_PATHS
                ):
                    self.pusher_auth_flows.append(flow)

        # Extract date from headers from both flows
        dates = []

        for flow in self.filtered_flows:
            date = self.get_date_from_recorded_flow(flow["flow"])
            if date:
                dates.append(date)

        for flow in self.other_flows:
            date = self.get_date_from_recorded_flow(flow)
            if date:
                dates.append(date)

        if dates:
            self.smallest_date = min(dates)
            self.smallest_date_ts = calendar.timegm(self.smallest_date.timetuple())

        # For attachment flows remove everything after ? in the path
        # That is the authenticaion token and we don't need to store that
        for flow in self.attachment_flows:
            path = flow["request"]["path"].decode().split("?")[0]
            flow["request"]["path"] = path.encode()

        # For WS flows, keep only server-initiated messages
        for flow in self.ws_flows:
            messages = flow["websocket"]["messages"]
            flow["websocket"]["messages"] = [
                list(msg) for msg in messages if msg[1] is False
            ]
            flow["websocket"]["reserved"] = False

        # Sort the flows by timestamp
        self.filtered_flows.sort(key=lambda x: x["flow"]["request"]["timestamp_start"])
        self.other_flows.sort(key=lambda x: x["request"]["timestamp_start"])

        # Further bucket into email-based flows
        self.email_based_flows = defaultdict(list)
        for flow_entry in self.filtered_flows:
            flow = flow_entry["flow"]
            headers = self.convert_headers_to_dict(flow["request"]["headers"])
            if self.is_multipart_form_data(headers) or self.is_x_www_form_urlencoded(
                headers
            ):
                email = self.get_email_from_request(
                    flow["request"]["content"],
                    headers.get(b"content-type", b"").decode("utf-8"),
                )
                if email:
                    self.email_based_flows[email].append(flow_entry)

        # Further bucket to handle duplicate calls
        self.duplicate_handle_flows = defaultdict(dict)
        for flow_entry in self.filtered_flows:
            flow = flow_entry["flow"]
            headers = self.convert_headers_to_dict(flow["request"]["headers"])
            if any(path in flow["request"]["path"] for path in DUPLICATE_HANDLE_PATHS):
                for key in DUPLICATE_HANDLE_KEYS:
                    unique_ids = self.extract_unique_ids(
                        flow["request"]["content"],
                        headers.get(b"content-type", b"").decode("utf-8"),
                    )
                    value = unique_ids.get(key)
                    update_id = unique_ids.get("clientUpdateID")
                    if value:
                        # The goal is to cache the last flow for each key
                        self.duplicate_handle_flows[(value, update_id)] = flow_entry

        # Create a map of channel to key for Pusher flows
        self.create_channel_key_map()

        # Decrypt Pusher messages
        self.decrypt_websocket_messages()