openwhisk/initHandler.go (194 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package openwhisk
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
)
type initBodyRequest struct {
Code string `json:"code,omitempty"`
Binary bool `json:"binary,omitempty"`
Main string `json:"main,omitempty"`
Env map[string]interface{} `json:"env,omitempty"`
}
type initRequest struct {
ProxiedActionID string `json:"proxiedActionID,omitempty"`
Value initBodyRequest `json:"value,omitempty"`
}
func sendOK(w http.ResponseWriter) {
// answer OK
w.Header().Set("Content-Type", "application/json")
buf := []byte("{\"ok\":true}\n")
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf)))
w.Write(buf)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
func (ap *ActionProxy) initHandler(w http.ResponseWriter, r *http.Request) {
// you can do multiple initializations when debugging
if ap.initialized && !Debugging {
msg := "Cannot initialize the action more than once."
sendError(w, http.StatusForbidden, msg)
log.Println(msg)
return
}
if ap.proxyMode == ProxyModeClient {
ap.ForwardInitRequest(w, r)
return
}
if ap.compiler != "" {
Debug("compiler: " + ap.compiler)
}
// read body of the request
body, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("%v", err))
return
}
// decode request parameters
if len(body) < 1000 {
Debug("init: decoding %s\n", string(body))
}
var request initRequest
err = json.Unmarshal(body, &request)
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("Error unmarshaling request: %v", err))
return
}
if ap.proxyMode == ProxyModeServer {
if ap.serverProxyData == nil {
ap.serverProxyData = &ServerProxyData{actions: make(map[RemoteAPKey]*RemoteAPValue)}
}
if ok := doRemoteInit(ap, request, w); !ok {
return
}
sendOK(w)
return
}
if err := ap.doInit(request, w); err != nil {
Debug("Error initializing action: %v", err)
return
}
sendOK(w)
}
// doRemoteInit initializes a remote action.
// Returns true if the initialization was successful, false otherwise.
func doRemoteInit(ap *ActionProxy, request initRequest, w http.ResponseWriter) bool {
Debug("Remote initialization started.")
// Get the action code hash from the client request
actionCodeHash, ok := request.Value.Env[OW_CODE_HASH].(string)
if !ok {
sendError(w, http.StatusBadGateway, "Cannot identify the action in remote runtime (missing hash).")
return false
}
if request.ProxiedActionID == "" {
sendError(w, http.StatusBadGateway, "Missing action id from client.")
return false
}
Debug("Action code hash extracted: %s", actionCodeHash)
// check if the action is already initialized
if nestedAP, ok := ap.serverProxyData.actions[actionCodeHash]; ok {
Debug("Action already initialized. Added action ID %s to action hash %s", request.ProxiedActionID, actionCodeHash)
nestedAP.connectedActionIDs = append(nestedAP.connectedActionIDs, request.ProxiedActionID)
sendOK(w)
return true
}
outLog, err := os.CreateTemp("", "out-log")
if err != nil {
outLog = ap.outFile
}
errLog, err := os.CreateTemp("", "err-log")
if err != nil {
errLog = ap.errFile
}
Debug("Creating nested action proxy...")
innerActionProxy := NewActionProxy(ap.baseDir, ap.compiler, outLog, errLog, ProxyModeNone)
if err := innerActionProxy.doInit(request, w); err != nil {
return false
}
ap.serverProxyData.actions[actionCodeHash] = &RemoteAPValue{
remoteProxy: innerActionProxy,
connectedActionIDs: []string{request.ProxiedActionID},
runRequestQueue: make(chan *remoteRunChanPayload, 50), // size could be determined empirically
}
Debug("Started listening to run requests for AP with code hash %s...", actionCodeHash)
go startListenToRunRequests(innerActionProxy, ap.serverProxyData.actions[actionCodeHash].runRequestQueue)
Debug("Added action id %s to action hash %s", request.ProxiedActionID, actionCodeHash)
return true
}
func (ap *ActionProxy) doInit(request initRequest, w http.ResponseWriter) error {
// request with empty code - stop any executor but return ok
if request.Value.Code == "" {
sendError(w, http.StatusForbidden, "Missing main/no code to execute.")
return fmt.Errorf("code in body is empty")
}
// passing the env to the action proxy
ap.SetEnv(request.Value.Env)
// setting main
main := request.Value.Main
if main == "" {
main = "main"
}
// extract code eventually decoding it
var buf []byte
if request.Value.Binary {
Debug("it is binary code")
b, err := base64.StdEncoding.DecodeString(request.Value.Code)
if err != nil {
sendError(w, http.StatusBadRequest, "cannot decode the request: "+err.Error())
return err
}
buf = b
} else {
Debug("it is source code")
buf = []byte(request.Value.Code)
}
// if a compiler is defined try to compile
_, err := ap.ExtractAndCompile(&buf, main)
if err != nil {
if os.Getenv("OW_LOG_INIT_ERROR") == "" {
sendError(w, http.StatusBadGateway, err.Error())
} else {
ap.errFile.Write([]byte(err.Error() + "\n"))
ap.outFile.Write([]byte(OutputGuard))
ap.errFile.Write([]byte(OutputGuard))
sendError(w, http.StatusBadGateway, "The action failed to generate or locate a binary. See logs for details.")
}
return err
}
// start an action
err = ap.StartLatestAction()
if err != nil {
if os.Getenv("OW_LOG_INIT_ERROR") == "" {
sendError(w, http.StatusBadGateway, "cannot start action: "+err.Error())
} else {
ap.errFile.Write([]byte(err.Error() + "\n"))
ap.outFile.Write([]byte(OutputGuard))
ap.errFile.Write([]byte(OutputGuard))
sendError(w, http.StatusBadGateway, "Cannot start action. Check logs for details.")
}
return err
}
ap.initialized = true
return nil
}
// ExtractAndCompile decode the buffer and if a compiler is defined, compile it also
func (ap *ActionProxy) ExtractAndCompile(buf *[]byte, main string) (string, error) {
// extract action in src folder
file, err := ap.ExtractAction(buf, "src")
if err != nil {
return "", err
}
if file == "" {
return "", fmt.Errorf("empty filename")
}
// some path surgery
dir := filepath.Dir(file)
parent := filepath.Dir(dir)
srcDir := filepath.Join(parent, "src")
binDir := filepath.Join(parent, "bin")
binFile := filepath.Join(binDir, "exec")
// if the file is already compiled or there is no compiler just move it from src to bin
if ap.compiler == "" || isCompiled(file) {
os.Rename(srcDir, binDir)
return binFile, nil
}
// ok let's try to compile
Debug("compiling: %s main: %s", file, main)
os.Mkdir(binDir, 0755)
err = ap.CompileAction(main, srcDir, binDir)
if err != nil {
return "", err
}
// check only if the file exist
if _, err := os.Stat(binFile); os.IsNotExist(err) {
return "", fmt.Errorf("cannot compile")
}
return binFile, nil
}