in runtime_scripts/replay.py [0:0]
def inject_ws(self, flow: http.HTTPFlow, ws_flow):
"""
Inject a WebSocket message.
Args:
flow (http.HTTPFlow): The HTTP flow.
ws_flow (dict): The WebSocket flow.
"""
if not self.pusher_client:
logger.warning("Pusher client not initialized, cannot inject WS messages.")
return
ws_messages = ws_flow["websocket"]["messages"]
current_date = calendar.timegm(datetime.utcnow().timetuple())
# Update timestamps for the WS messages
smallest_ts = min(msg[3] for msg in ws_messages) - 1
for msg in ws_messages:
msg[3] = current_date + (msg[3] - smallest_ts)
while True:
_now = datetime.utcnow()
_ts = calendar.timegm(_now.timetuple())
if not ws_messages:
logger.info(f"Finished injecting WS messages to {ws_flow['id']}")
break
if ws_messages[0][3] <= _ts:
# Get JSON message
msg = ws_messages.pop(0)
msg_json = msg[2]
channel_name = msg_json.get("channel", "")
event = msg_json.get("event", "")
if not channel_name or event.startswith("pusher"):
continue
# Perform ID replacement
for key, value in REPLACEMENT_VARS.items():
channel_name = channel_name.replace(key, value)
# Perform data replacement on the data
data = msg_json.get("data")
if not data:
continue
data = self.replace_dates(json.dumps(data))
for key, value in REPLACEMENT_VARS.items():
data = data.replace(key, value)
self.pusher_client.trigger(channel_name, event, json.loads(data))
logger.info(f"Injected WS message to {channel_name}")
time.sleep(0.5)