def inject_ws()

in runtime_scripts/replay.py [0:0]


    def inject_ws(self, flow: http.HTTPFlow, ws_flow):
        """
        Inject a WebSocket message.

        Args:
            flow (http.HTTPFlow): The HTTP flow.
            ws_flow (dict): The WebSocket flow.
        """
        if not self.pusher_client:
            logger.warning("Pusher client not initialized, cannot inject WS messages.")
            return

        ws_messages = ws_flow["websocket"]["messages"]
        current_date = calendar.timegm(datetime.utcnow().timetuple())
        # Update timestamps for the WS messages
        smallest_ts = min(msg[3] for msg in ws_messages) - 1
        for msg in ws_messages:
            msg[3] = current_date + (msg[3] - smallest_ts)
        while True:
            _now = datetime.utcnow()
            _ts = calendar.timegm(_now.timetuple())
            if not ws_messages:
                logger.info(f"Finished injecting WS messages to {ws_flow['id']}")
                break
            if ws_messages[0][3] <= _ts:
                # Get JSON message
                msg = ws_messages.pop(0)
                msg_json = msg[2]
                channel_name = msg_json.get("channel", "")
                event = msg_json.get("event", "")
                if not channel_name or event.startswith("pusher"):
                    continue
                # Perform ID replacement
                for key, value in REPLACEMENT_VARS.items():
                    channel_name = channel_name.replace(key, value)
                # Perform data replacement on the data
                data = msg_json.get("data")
                if not data:
                    continue
                data = self.replace_dates(json.dumps(data))
                for key, value in REPLACEMENT_VARS.items():
                    data = data.replace(key, value)
                self.pusher_client.trigger(channel_name, event, json.loads(data))
                logger.info(f"Injected WS message to {channel_name}")
            time.sleep(0.5)