in runtime_scripts/replay.py [0:0]
def request(self, flow: http.HTTPFlow) -> None:
"""
Handle incoming HTTP requests.
Args:
flow (http.HTTPFlow): The HTTP flow.
"""
# First check if incoming request is for WS
ws_match = any(host in flow.request.url for host in WEBSOCKET_HOSTS)
if ws_match:
logger.info(f"Intercepted WS request to {flow.request.url}")
for ws_flow in self.ws_flows:
if ws_flow["websocket"]["reserved"]:
continue
if ws_flow["server_conn"]["address"] == flow.server_conn.address \
or ("pusher" in ws_flow["server_conn"]["sni"] and "pusher" in flow.server_conn.sni):
ws_flow["websocket"]["reserved"] = True
t = threading.Thread(target=self.inject_ws, args=(flow, ws_flow))
t.start()
WS_TASKS.add(t)
logger.info(f"Reserved WS flow: {ws_flow['id']}")
return
host_matched = any(host in flow.request.pretty_host for host in EXPENSIFY_HOSTS)
if (
host_matched
and any(path in flow.request.path for path in REQUESTS_TO_MATCH)
and flow.request.method != "OPTIONS"
):
logger.info(f"Intercepted request to {flow.request.url}")
if self.current_date is None:
self.current_date = datetime.utcnow()
recorded_response = self.find_matching_response(flow)
try:
content = recorded_response["response"]["content"]
except Exception:
content = json.dumps({})
try:
response_code = recorded_response["response"]["status_code"]
except Exception:
response_code = 200
try:
headers = recorded_response["response"]["headers"]
except Exception:
headers = {}
flow.response = http.Response.make(response_code, content, dict(headers))