in src/handlers/action_handler.go [28:113]
func ActionStreamHandler(streamingProxyAddr string, apihost string) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx, done := context.WithCancel(r.Context())
namespace, actionToInvoke := getNamespaceAndAction(r)
log.Printf("Private Action request: %s (%s)", actionToInvoke, namespace)
apiKey, err := extractAuthToken(r)
if err != nil {
log.Println(err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
done()
return
}
// Create OpenWhisk client
client := NewOpenWhiskClient(apihost, apiKey, namespace)
// opens a socket for listening in a random port
sock, err := tcp.SetupTcpServer(ctx, streamingProxyAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
enrichedBody, err := injectHostPortInBody(r, sock.Host, sock.Port)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
// invoke the action
_, httpResp, err := client.Actions.Invoke(actionToInvoke, enrichedBody, false, false)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
done()
return
}
// We need to handle status in the range from 200 to 299
// as success, and everything else as an error.
// In particular, we need to handle 202 Accepted
// as a success, because the action is invoked
// asynchronously and the response is not available yet.
// We also need to handle 204 No Content as a success,
// because the action is invoked and there is no response.
// It seems that the invoker is releasing a 202 Accepted
// after 60 seconds, so we need to handle that as well.
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
http.Error(w, "Error invoking action: "+httpResp.Status, http.StatusInternalServerError)
done()
return
}
// Flush the headers
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
done()
return
}
for {
select {
case data, isChannelOpen := <-sock.StreamDataChan:
if !isChannelOpen {
done()
return
}
_, err := w.Write([]byte(string(data) + "\n"))
if err != nil {
http.Error(w, "failed to write data: "+err.Error(), http.StatusInternalServerError)
done()
return
}
flusher.Flush()
case <-r.Context().Done():
log.Println("HTTP Client closed connection")
done()
return
}
}
}
}