openwhisk/runHandler.go (217 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package openwhisk import ( "bytes" "encoding/json" "fmt" "io" "net/http" "os" ) type runRequest struct { ActionCodeHash string `json:"actionCodeHash,omitempty"` Value map[string]interface{} `json:"value,omitempty"` } // ErrResponse is the response when there are errors type ErrResponse struct { Error string `json:"error"` } type remoteRunChanPayload struct { runRequest *runRequest respChan chan *ServerRunResponseChanPayload } type ServerRunResponseChanPayload struct { runResp *RemoteRunResponse status int err error } type RemoteRunResponse struct { Response json.RawMessage `json:"response"` Out string `json:"out"` Err string `json:"err"` } func sendError(w http.ResponseWriter, code int, cause string) { errResponse := ErrResponse{Error: cause} b, err := json.Marshal(errResponse) if err != nil { b = []byte("error marshalling error response") Debug(err.Error()) } w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) w.Write(b) w.Write([]byte("\n")) } func (ap *ActionProxy) runHandler(w http.ResponseWriter, r *http.Request) { if ap.proxyMode == ProxyModeNone { ap.doRun(w, r) return } if ap.proxyMode == ProxyModeClient { ap.ForwardRunRequest(w, r) return } if ap.proxyMode == ProxyModeServer { // parse the request body, err := io.ReadAll(r.Body) defer r.Body.Close() if err != nil { sendError(w, http.StatusBadRequest, fmt.Sprintf("Error reading request body: %v", err)) return } var runRequest runRequest err = json.NewDecoder(bytes.NewReader(body)).Decode(&runRequest) if err != nil { sendError(w, http.StatusBadRequest, fmt.Sprintf("Error decoding run body: %v", err)) return } if runRequest.ActionCodeHash == "" { sendError(w, http.StatusBadRequest, "Action code hash not provided from client") return } innerActionProxy, ok := ap.serverProxyData.actions[runRequest.ActionCodeHash] if !ok { Debug("Action hash %s not found in server proxy data", runRequest.ActionCodeHash) sendError(w, http.StatusNotFound, "Action not found in remote runtime. Check logs for details.") return } // Enqueue the request to be processed by the inner proxy one at a time responseChan := make(chan *ServerRunResponseChanPayload) innerActionProxy.runRequestQueue <- &remoteRunChanPayload{runRequest: &runRequest, respChan: responseChan} res := <-responseChan if res.err != nil { sendError(w, res.status, res.err.Error()) return } // write response // turn response struct into json responsePayload, err := json.Marshal(res.runResp) if err != nil { sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error marshalling response: %v", err)) return } // write response w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", fmt.Sprintf("%d", len(responsePayload))) numBytesWritten, err := w.Write(responsePayload) // handle writing errors if err != nil { Debug("(remote) Error writing response: %v", err) sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error writing response: %v", err)) return } if numBytesWritten < len(responsePayload) { Debug("(remote) Only wrote %d of %d bytes to response", numBytesWritten, len(responsePayload)) sendError(w, http.StatusInternalServerError, fmt.Sprintf("Only wrote %d of %d bytes to response", numBytesWritten, len(responsePayload))) return } // flush output if possible if f, ok := w.(http.Flusher); ok { f.Flush() } close(responseChan) } } func startListenToRunRequests(ap *ActionProxy, runRequestQueue chan *remoteRunChanPayload) { for runReq := range runRequestQueue { remoteResponse, status, err := ap.doServerModeRun(runReq.runRequest) runReq.respChan <- &ServerRunResponseChanPayload{runResp: &remoteResponse, status: status, err: err} } } func (ap *ActionProxy) doServerModeRun(bodyRequest *runRequest) (RemoteRunResponse, int, error) { Debug("Executing run request in server mode") body, status, err := prepareRemoteRunBody(ap, bodyRequest) if err != nil { return RemoteRunResponse{}, status, err } // execute the action response, err := ap.theExecutor.Interact(body) // check for early termination if err != nil { Debug("WARNING! Command exited") ap.theExecutor = nil return RemoteRunResponse{}, http.StatusBadRequest, fmt.Errorf("command exited") } DebugLimit("received (remote): ", response, 120) // check if the answer is an object map or array if ok := isJsonObjOrArray(response); !ok { return RemoteRunResponse{}, http.StatusBadGateway, fmt.Errorf("the action did not return a dictionary or array") } // Get the stdout and stderr from the executor outStr, err := os.ReadFile(ap.outFile.Name()) if err != nil { outStr = []byte(fmt.Sprintf("Error reading stdout: %v", err)) } errStr, err := os.ReadFile(ap.errFile.Name()) if err != nil { errStr = []byte(fmt.Sprintf("Error reading stderr: %v", err)) } // clear out the files os.Truncate(ap.outFile.Name(), 0) os.Truncate(ap.errFile.Name(), 0) // create the response struct remoteResponse := RemoteRunResponse{ Response: response, Out: string(outStr), Err: string(errStr), } return remoteResponse, 0, nil } func (ap *ActionProxy) doRun(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) defer r.Body.Close() if err != nil { sendError(w, http.StatusBadRequest, fmt.Sprintf("Error reading request body: %v", err)) return } Debug("done reading %d bytes", len(body)) body = bytes.Replace(body, []byte("\n"), []byte(""), -1) // check if you have an action if ap.theExecutor == nil { sendError(w, http.StatusInternalServerError, "no action defined yet") return } // check if the process exited if ap.theExecutor.Exited() { sendError(w, http.StatusInternalServerError, "command exited") return } // execute the action response, err := ap.theExecutor.Interact(body) // check for early termination if err != nil { Debug("WARNING! Command exited") ap.theExecutor = nil sendError(w, http.StatusBadRequest, "command exited") return } DebugLimit("received:", response, 120) // check if the answer is an object map or array if ok := isJsonObjOrArray(response); !ok { sendError(w, http.StatusBadGateway, "The action did not return a dictionary or array.") return } // write response w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Length", fmt.Sprintf("%d", len(response))) numBytesWritten, err := w.Write(response) // flush output if possible if f, ok := w.(http.Flusher); ok { f.Flush() } // handle writing errors if err != nil { Debug("Error writing response: %v", err) sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error writing response: %v", err)) return } if numBytesWritten != len(response) { Debug("Only wrote %d of %d bytes to response", numBytesWritten, len(response)) sendError(w, http.StatusInternalServerError, fmt.Sprintf("Only wrote %d of %d bytes to response", numBytesWritten, len(response))) return } } func isJsonObjOrArray(response []byte) bool { var objmap map[string]*json.RawMessage var objarray []interface{} err := json.Unmarshal(response, &objmap) if err != nil { err = json.Unmarshal(response, &objarray) if err != nil { return false } } return true } func prepareRemoteRunBody(ap *ActionProxy, bodyRequest *runRequest) ([]byte, int, error) { var bodyBuf bytes.Buffer err := json.NewEncoder(&bodyBuf).Encode(bodyRequest) if err != nil { return nil, http.StatusBadRequest, fmt.Errorf("error encoding proxied run body: %v", err) } body := bytes.Replace(bodyBuf.Bytes(), []byte("\n"), []byte(""), -1) // check if you have an action if ap.theExecutor == nil { return nil, http.StatusInternalServerError, fmt.Errorf("no action defined yet") } // check if the process exited if ap.theExecutor.Exited() { return nil, http.StatusInternalServerError, fmt.Errorf("command exited") } return body, 0, nil }