func()

in llm/go-client/frontend/handlers/chat.go [71:193]


func (h *ChatHandler) Chat(c *gin.Context) {
	session := sessions.Default(c)
	ctxID, ok := session.Get("current_context").(string)
	if !ok {
		h.NewContext(c)
		ctxID, ok = session.Get("current_context").(string)
		if !ok {
			c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get context"})
			return
		}
	}

	var req struct {
		Message string `json:"message"`
		Bin     string `json:"bin"`
		Model   string `json:"model"`
	}

	if err := c.BindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid request payload"})
		return
	}

	var img string
	if len(req.Bin) > 0 {
		re := regexp.MustCompile(`^data:image/([a-zA-Z]+);base64,([^"]+)$`)
		// this regex does not support file types like svg
		matches := re.FindStringSubmatch(req.Bin)

		if len(matches) != 3 {
			c.JSON(http.StatusBadRequest, gin.H{"error": "invalid base64 data format"})
			return
		}

		img = matches[2]
	}

	h.ctxManager.AppendMessage(ctxID, &chat.ChatMessage{
		Role:    "human",
		Content: req.Message,
		Bin:     []byte(img),
	})

	messages := h.ctxManager.GetHistory(ctxID)
	stream, err := h.svc.Chat(context.Background(), &chat.ChatRequest{
		Messages: messages,
		Model:    req.Model,
	})
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
		return
	}
	defer func() {
		if err := stream.Close(); err != nil {
			log.Println("Error closing stream:", err)
		}
	}()

	c.Header("Content-Type", "text/event-stream")
	c.Header("Cache-Control", "no-cache")
	c.Header("Connection", "close")

	responseCh := make(chan string, 100) // use buffer

	go func() {
		defer func() {
			if r := recover(); r != nil {
				log.Printf("Recovered in stream processing: %v\n%s", r, debug.Stack())
			}
			close(responseCh)
		}()

		resp := ""
		for {
			select {
			case <-c.Request.Context().Done(): // client disconnect
				log.Println("Client disconnected, stopping stream processing")
				return
			default:
				if !stream.Recv() {
					if err := stream.Err(); err != nil {
						c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
						log.Printf("Stream receive error: %v", err)
					}
					h.ctxManager.AppendMessage(ctxID, &chat.ChatMessage{
						Role:    "ai",
						Content: resp,
						Bin:     nil,
					})
					return
				}
				content := stream.Msg().Content
				resp += content
				responseCh <- content
			}
		}
	}()

	// SSE stream output
	cfg, err := config.GetConfig()
	if err != nil {
		fmt.Printf("Error loading config: %v\n", err)
		return
	}
	timeout := cfg.TimeoutSeconds

	c.Stream(func(w io.Writer) bool {
		select {
		case chunk, ok := <-responseCh:
			if !ok {
				return false
			}
			c.SSEvent("message", gin.H{"content": chunk})
			return true
		case <-time.After(time.Duration(timeout) * time.Second):
			log.Println("Stream time out")
			return false
		case <-c.Request.Context().Done():
			log.Println("Client disconnected")
			return false
		}
	})
}