kernels-mixer/kernels_mixer/websockets.py (67 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import uuid from tornado.escape import json_decode, utf8 from jupyter_server.gateway.connections import GatewayWebSocketConnection from jupyter_server.gateway.managers import GatewayKernelManager from jupyter_server.services.kernels.connection.base import BaseKernelWebsocketConnection from jupyter_server.services.kernels.connection.channels import ZMQChannelsWebsocketConnection class StartingReportingWebsocketConnection(GatewayWebSocketConnection): """Extension of GatewayWebSocketConnection that reports a starting status on connection. The purpose of this class is to bridge the time period between when the websocket connection from the client is created and the websocket connection to the Gateway server is created. During that time, the JupyterLab UI believes that it is connected to the running kernel, but it has not yet received any status messages from the kernel, so it will display a kernel status of "Unknown". That "Unknown" status is not very helpful to users because they have no idea why the kernel status is unknown and have no indication that something is still happening under the hood. To improve that, we report a provisional status as soon as the client connection is established. This provisional status will be replaced by the real status reported by the kernel as soon as the backend kernel connection is established. The only kernel statuses supported by the JupyterLab UI are "starting", "idle", "busy", "restarting", and "dead". Of those, the "starting" message is the closest match to what is going on, so we use that one. However, the "starting" message is only supposed to be reported once, so we also intercept any "starting" messages received from the kernel and discard them, as we know we will have already reported this status. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) async def connect(self): # The kernel message format is defined # [here](https://jupyter-client.readthedocs.io/en/latest/messaging.html#general-message-format). status_message_id = str(uuid.uuid4()) status_message = { "header": { "msg_id": status_message_id, "session": self.kernel_id, "username": "username", "date": datetime.datetime.utcnow().isoformat(), "msg_type": "status", "version": "5.3", }, "parent_header": {}, "metadata": {}, "msg_id": status_message_id, "msg_type": "status", "channel": "iopub", "content": { "execution_state": "starting", }, "buffers": [], } super().handle_outgoing_message(json.dumps(status_message)) return await super().connect() def is_starting_message(self, incoming_msg): try: msg = json_decode(utf8(incoming_msg)) if msg.get("content", {}).get("execution_state", "") == "starting": return True except Exception as ex: pass return False def handle_outgoing_message(self, incoming_msg, *args, **kwargs): if self.is_starting_message(incoming_msg): # We already sent a starting message, so drop this one. return return super().handle_outgoing_message(incoming_msg, *args, **kwargs) class DelegatingWebsocketConnection(BaseKernelWebsocketConnection): """Implementation of BaseKernelWebsocketConnection that delegates to another connection. If the parent KernelManager instance is for a remote kernel (i.e. it is a GatewayKernelManager), then the delegate is an instance of StartingReportingWebsocketConnection, which extends GatewayWebSocketConnection. Otherwise, it is an instance of ZMQChannelsWebsocketConnection. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) delegate_class = ZMQChannelsWebsocketConnection if self.kernel_manager.is_remote: delegate_class = StartingReportingWebsocketConnection self.delegate = delegate_class( parent=self.kernel_manager.delegate, websocket_handler=self.websocket_handler, config=self.config) async def connect(self): return await self.delegate.connect() def disconnect(self): return self.delegate.disconnect() def handle_incoming_message(self, msg): return self.delegate.handle_incoming_message(msg) def handle_outgoing_message(self, stream, msg): return self.delegate.handle_outgoing_message(stream, msg) # Prepare actually comes from ZMQChannelsWebsocketConnection. # # It is called by the jupyter_server kernels websocket handler if present, so # we provide an implemention of it in case the delegate is an instance of the # ZMQChannelWebsocketConnection class. async def prepare(self): if hasattr(self.delegate, "prepare"): return await self.delegate.prepare()