in runtime_scripts/replay.py [0:0]
def process_flows(self):
"""
Process the recorded flows, separating them into filtered and other flows.
"""
# First, decompress the content of the flows
for flow in self.recorded_flows:
if not flow["response"]:
continue
flow["response"]["content"] = self.decompress_gzip(
flow["response"]["content"], flow["response"]["headers"]
)
if isinstance(flow["response"]["content"], str):
flow["response"]["content"] = (
flow["response"]["content"]
.encode("utf-16", "surrogatepass")
.decode("utf-16")
)
flow["response"]["content"] = flow["response"]["content"].replace(
r"\/", "/"
)
for flow in self.recorded_flows:
if "method" not in flow["request"]:
continue
elif flow["websocket"] and any(
host in flow["server_conn"]["sni"] for host in WEBSOCKET_HOSTS
):
self.ws_flows.append(flow)
elif flow["server_conn"]["sni"] not in EXPENSIFY_HOSTS:
self.other_flows.append(flow)
elif any(
flow["request"]["path"].startswith(path) for path in UNNECESSARY_PATHS
):
self.other_flows.append(flow)
elif any(
flow["request"]["path"].startswith(path)
for path in CHAT_ATTACHMENTS_PATHS
):
self.attachment_flows.append(flow)
else:
self.filtered_flows.append({"flow": flow, "marked": False})
if any(
flow["request"]["path"].startswith(path)
for path in PUSHER_AUTHENTICATION_PATHS
):
self.pusher_auth_flows.append(flow)
# Extract date from headers from both flows
dates = []
for flow in self.filtered_flows:
date = self.get_date_from_recorded_flow(flow["flow"])
if date:
dates.append(date)
for flow in self.other_flows:
date = self.get_date_from_recorded_flow(flow)
if date:
dates.append(date)
if dates:
self.smallest_date = min(dates)
self.smallest_date_ts = calendar.timegm(self.smallest_date.timetuple())
# For attachment flows remove everything after ? in the path
# That is the authenticaion token and we don't need to store that
for flow in self.attachment_flows:
path = flow["request"]["path"].decode().split("?")[0]
flow["request"]["path"] = path.encode()
# For WS flows, keep only server-initiated messages
for flow in self.ws_flows:
messages = flow["websocket"]["messages"]
flow["websocket"]["messages"] = [
list(msg) for msg in messages if msg[1] is False
]
flow["websocket"]["reserved"] = False
# Sort the flows by timestamp
self.filtered_flows.sort(key=lambda x: x["flow"]["request"]["timestamp_start"])
self.other_flows.sort(key=lambda x: x["request"]["timestamp_start"])
# Further bucket into email-based flows
self.email_based_flows = defaultdict(list)
for flow_entry in self.filtered_flows:
flow = flow_entry["flow"]
headers = self.convert_headers_to_dict(flow["request"]["headers"])
if self.is_multipart_form_data(headers) or self.is_x_www_form_urlencoded(
headers
):
email = self.get_email_from_request(
flow["request"]["content"],
headers.get(b"content-type", b"").decode("utf-8"),
)
if email:
self.email_based_flows[email].append(flow_entry)
# Further bucket to handle duplicate calls
self.duplicate_handle_flows = defaultdict(dict)
for flow_entry in self.filtered_flows:
flow = flow_entry["flow"]
headers = self.convert_headers_to_dict(flow["request"]["headers"])
if any(path in flow["request"]["path"] for path in DUPLICATE_HANDLE_PATHS):
for key in DUPLICATE_HANDLE_KEYS:
unique_ids = self.extract_unique_ids(
flow["request"]["content"],
headers.get(b"content-type", b"").decode("utf-8"),
)
value = unique_ids.get(key)
update_id = unique_ids.get("clientUpdateID")
if value:
# The goal is to cache the last flow for each key
self.duplicate_handle_flows[(value, update_id)] = flow_entry
# Create a map of channel to key for Pusher flows
self.create_channel_key_map()
# Decrypt Pusher messages
self.decrypt_websocket_messages()