in jupytertestutil/jupytertestutil.go [272:366]
func (m *mockJupyter) connectToKernel(w http.ResponseWriter, r *http.Request, kernelID string) {
log.Printf("Handling a websocket upgrade request: %+v", r)
m.mu.Lock()
_, ok := m.kernels[kernelID]
m.mu.Unlock()
if !ok {
log.Printf("Kernel not found: %q", kernelID)
http.NotFound(w, r)
return
}
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Subprotocols: []string{},
}
conn, err := upgrader.Upgrade(w, r, http.Header{})
if err != nil {
log.Printf("Failure in the websocket upgrade call: %v", err)
return
}
baseCloseHandler := conn.CloseHandler()
conn.SetCloseHandler(func(code int, test string) error {
cancel()
return baseCloseHandler(code, test)
})
defer func() {
conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now())
conn.Close()
}()
log.Printf("Negotiated websocket subprotocol: %q", conn.Subprotocol())
clientMsgs := make(chan *KernelMessage, 1)
go func() {
defer func() {
close(clientMsgs)
cancel()
}()
for {
select {
case <-ctx.Done():
return
default:
_, msgBytes, err := conn.ReadMessage()
if err != nil {
return
}
var kernelMessage KernelMessage
if err := json.Unmarshal(msgBytes, &kernelMessage); err != nil {
return
}
clientMsgs <- &kernelMessage
}
}
}()
for {
var executionCount int
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
if err := conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now()); err != nil {
return
}
case msg := <-clientMsgs:
if msg == nil {
// The channel has been closed
return
}
executionCount++
resp := &KernelMessage{
Header: &KernelMessageHeader{
Date: time.Now().Format(time.RFC3339),
MsgID: uuid.NewString(),
MsgType: "execute_reply",
Session: msg.Header.Session,
Username: "user",
Version: "5.4",
},
Channel: msg.Channel,
Content: map[string]any{
"execution_count": executionCount,
"payload": []any{},
"status": "ok",
},
ParentHeadder: msg.Header,
}
respBytes, err := json.Marshal(resp)
if err != nil {
return
}
conn.WriteMessage(websocket.TextMessage, respBytes)
}
}
}