func()

in jupytertestutil/jupytertestutil.go [272:366]


func (m *mockJupyter) connectToKernel(w http.ResponseWriter, r *http.Request, kernelID string) {
	log.Printf("Handling a websocket upgrade request: %+v", r)
	m.mu.Lock()
	_, ok := m.kernels[kernelID]
	m.mu.Unlock()
	if !ok {
		log.Printf("Kernel not found: %q", kernelID)
		http.NotFound(w, r)
		return
	}
	ctx, cancel := context.WithCancel(r.Context())
	defer cancel()
	upgrader := websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		Subprotocols:    []string{},
	}
	conn, err := upgrader.Upgrade(w, r, http.Header{})
	if err != nil {
		log.Printf("Failure in the websocket upgrade call: %v", err)
		return
	}
	baseCloseHandler := conn.CloseHandler()
	conn.SetCloseHandler(func(code int, test string) error {
		cancel()
		return baseCloseHandler(code, test)
	})
	defer func() {
		conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now())
		conn.Close()
	}()
	log.Printf("Negotiated websocket subprotocol: %q", conn.Subprotocol())
	clientMsgs := make(chan *KernelMessage, 1)
	go func() {
		defer func() {
			close(clientMsgs)
			cancel()
		}()
		for {
			select {
			case <-ctx.Done():
				return
			default:
				_, msgBytes, err := conn.ReadMessage()
				if err != nil {
					return
				}
				var kernelMessage KernelMessage
				if err := json.Unmarshal(msgBytes, &kernelMessage); err != nil {
					return
				}
				clientMsgs <- &kernelMessage
			}
		}
	}()
	for {
		var executionCount int
		select {
		case <-ctx.Done():
			return
		case <-time.After(5 * time.Second):
			if err := conn.WriteControl(websocket.PingMessage, []byte("ping"), time.Now()); err != nil {
				return
			}
		case msg := <-clientMsgs:
			if msg == nil {
				// The channel has been closed
				return
			}
			executionCount++
			resp := &KernelMessage{
				Header: &KernelMessageHeader{
					Date:     time.Now().Format(time.RFC3339),
					MsgID:    uuid.NewString(),
					MsgType:  "execute_reply",
					Session:  msg.Header.Session,
					Username: "user",
					Version:  "5.4",
				},
				Channel: msg.Channel,
				Content: map[string]any{
					"execution_count": executionCount,
					"payload":         []any{},
					"status":          "ok",
				},
				ParentHeadder: msg.Header,
			}
			respBytes, err := json.Marshal(resp)
			if err != nil {
				return
			}
			conn.WriteMessage(websocket.TextMessage, respBytes)
		}
	}
}