func WebActionStreamHandler()

in src/handlers/web_action_handler.go [31:114]


func WebActionStreamHandler(streamingProxyAddr string, apihost string) func(http.ResponseWriter, *http.Request) {
	return func(w http.ResponseWriter, r *http.Request) {
		ctx, done := context.WithCancel(r.Context())

		namespace, actionToInvoke := getNamespaceAndAction(r)
		log.Printf("Web Action requested: %s (%s)", actionToInvoke, namespace)

		// opens a socket for listening in a random port
		sock, err := tcp.SetupTcpServer(ctx, streamingProxyAddr)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			done()
			return
		}

		// parse the json body and add STREAM_HOST and STREAM_PORT
		enrichedBody, err := injectHostPortInBody(r, sock.Host, sock.Port)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			done()
			return
		}

		// invoke the action
		actionToInvoke = ensurePackagePresent(actionToInvoke)

		jsonData, err := json.Marshal(enrichedBody)
		if err != nil {
			http.Error(w, "Error encoding JSON body: "+err.Error(), http.StatusInternalServerError)
			done()
			return
		}
		url := fmt.Sprintf("%s/api/v1/web/%s/%s", apihost, namespace, actionToInvoke)

		// Read headers and set them in the request
		headers := make(map[string]string)
		for key, values := range r.Header {
			// Use the first value for the header
			if len(values) > 0 {
				headers[key] = values[0]
			}
		}

		errChan := make(chan error)
		defer close(errChan)
		go asyncPostWebAction(errChan, url, jsonData, headers)

		// Flush the headers
		flusher, ok := w.(http.Flusher)
		if !ok {
			http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
			done()
			return
		}

		for {
			select {
			case data, isChannelOpen := <-sock.StreamDataChan:
				if !isChannelOpen {
					done()
					return
				}
				_, err := w.Write([]byte(string(data) + "\n"))
				if err != nil {
					http.Error(w, "failed to write data: "+err.Error(), http.StatusInternalServerError)
					done()
					return
				}
				flusher.Flush()

			case <-r.Context().Done():
				log.Println("HTTP Client closed connection")
				done()
				return

			case err := <-errChan:
				log.Println("Error invoking action:", err)
				http.Error(w, err.Error(), http.StatusInternalServerError)
				done()
				return
			}
		}
	}
}