in src/handlers/web_action_handler.go [31:114]
func WebActionStreamHandler(streamingProxyAddr string, apihost string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx, done := context.WithCancel(r.Context())
namespace, actionToInvoke := getNamespaceAndAction(r)
log.Printf("Web Action requested: %s (%s)", actionToInvoke, namespace)
// opens a socket for listening in a random port
sock, err := tcp.SetupTcpServer(ctx, streamingProxyAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
// parse the json body and add STREAM_HOST and STREAM_PORT
enrichedBody, err := injectHostPortInBody(r, sock.Host, sock.Port)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
// invoke the action
actionToInvoke = ensurePackagePresent(actionToInvoke)
jsonData, err := json.Marshal(enrichedBody)
if err != nil {
http.Error(w, "Error encoding JSON body: "+err.Error(), http.StatusInternalServerError)
done()
return
}
url := fmt.Sprintf("%s/api/v1/web/%s/%s", apihost, namespace, actionToInvoke)
// Read headers and set them in the request
headers := make(map[string]string)
for key, values := range r.Header {
// Use the first value for the header
if len(values) > 0 {
headers[key] = values[0]
}
}
errChan := make(chan error)
defer close(errChan)
go asyncPostWebAction(errChan, url, jsonData, headers)
// Flush the headers
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
done()
return
}
for {
select {
case data, isChannelOpen := <-sock.StreamDataChan:
if !isChannelOpen {
done()
return
}
_, err := w.Write([]byte(string(data) + "\n"))
if err != nil {
http.Error(w, "failed to write data: "+err.Error(), http.StatusInternalServerError)
done()
return
}
flusher.Flush()
case <-r.Context().Done():
log.Println("HTTP Client closed connection")
done()
return
case err := <-errChan:
log.Println("Error invoking action:", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
}
}
}