openwhisk/stopHandler.go (129 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package openwhisk
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
)
type stopRequest struct {
ActionCodeHash string `json:"actionCodeHash,omitempty"`
ProxiedActionID string `json:"proxiedActionID,omitempty"`
}
var setupActionPath = "/tmp"
func (ap *ActionProxy) stopHandler(w http.ResponseWriter, r *http.Request) {
if ap.proxyMode != ProxyModeServer {
sendError(w, http.StatusUnprocessableEntity, "Stop is only supported in server mode")
return
}
if ap.serverProxyData == nil {
Debug("Server proxy data not initialized... a restart might have happened!")
sendError(w, http.StatusInternalServerError, "Server proxy data not initialized")
return
}
// parse the request
body, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("Error reading request body: %v", err))
return
}
var stopRequest stopRequest
err = json.NewDecoder(bytes.NewReader(body)).Decode(&stopRequest)
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("Error decoding run body: %v", err))
return
}
innerAPValue, ok := ap.serverProxyData.actions[stopRequest.ActionCodeHash]
if !ok {
Debug("Action hash '%s' not found in server proxy data", stopRequest.ActionCodeHash)
sendError(w, http.StatusNotFound, "Action to be removed in remote runtime not found. Check logs for details.")
return
}
connectedIDFound := false
for i, connectedID := range innerAPValue.connectedActionIDs {
if connectedID == stopRequest.ProxiedActionID {
// remove id from the array
innerAPValue.connectedActionIDs = removeID(innerAPValue.connectedActionIDs, i)
connectedIDFound = true
break
}
}
if !connectedIDFound {
Debug("Action ID '%s' not found in server proxy data", stopRequest.ProxiedActionID)
sendError(w, http.StatusNotFound, "Action to be removed in remote runtime not found. Check logs for details.")
return
}
Debug("Removed action ID. Length of connectedActionIDs: %d", len(innerAPValue.connectedActionIDs))
if len(innerAPValue.connectedActionIDs) == 0 {
if isSetupActionRunning(stopRequest.ActionCodeHash) {
go ap.timedDelete(stopRequest.ActionCodeHash)
} else {
stopAndDelete(ap, innerAPValue, stopRequest.ActionCodeHash)
}
}
sendOK(w)
}
func stopAndDelete(ap *ActionProxy, innerAPValue *RemoteAPValue, actionCodeHash string) {
Debug("Action hash '%s' executor stopped", actionCodeHash)
close(innerAPValue.runRequestQueue)
cleanUpAP(innerAPValue.remoteProxy)
delete(ap.serverProxyData.actions, actionCodeHash)
}
func cleanUpAP(ap *ActionProxy) {
ap.theExecutor.Stop()
if err := os.RemoveAll(filepath.Join(ap.baseDir, strconv.Itoa(ap.currentDir))); err != nil {
Debug("Error removing action directory: %v", err)
}
}
func removeID(idArray []string, index int) []string {
ret := make([]string, 0)
ret = append(ret, idArray[:index]...)
return append(ret, idArray[index+1:]...)
}
func isSetupActionRunning(actionCodeHash string) bool {
// A setup action is running if
// - the file "/tmp/{hash}" exists
// - the file "/tmp/{hash}_done" does not exist
path, err := filepath.Abs(filepath.Join(setupActionPath, actionCodeHash))
if err != nil {
Debug("Error getting 'setup check file' absolute path: %v", err)
return false
}
_, err = os.Stat(path)
if errors.Is(err, fs.ErrNotExist) {
return false
}
setupDoneFile := path + "_done"
_, err = os.Stat(setupDoneFile)
return errors.Is(err, fs.ErrNotExist)
}
var timeToDeletion = 10 * time.Minute
// timedDelete waits for a certain amount of time before deleting the action.
// The deletion is only done if no new actions have joined.
// The timer duration can be set using the OW_DELETE_DURATION environment variable.
//
// A duration string is a possibly signed sequence of decimal numbers,
// each with optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
func (serverAp *ActionProxy) timedDelete(actionCodeHash string) {
innerAPValue, ok := serverAp.serverProxyData.actions[actionCodeHash]
if !ok {
return
}
if len(innerAPValue.connectedActionIDs) > 0 {
return
}
timerDuration := timeToDeletion
deleleTimerMs := os.Getenv("OW_DELETE_DURATION")
if deleleTimerMs != "" {
dur, err := time.ParseDuration(deleleTimerMs)
if err != nil {
Debug("Error parsing OW_DELETE_DURATION: %v", err)
} else {
timerDuration = dur
}
}
Debug("Starting wait cycle for stopping hash '%s'", actionCodeHash)
<-time.After(timerDuration)
Debug("Ended wait cycle for stopping hash '%s'", actionCodeHash)
if len(innerAPValue.connectedActionIDs) == 0 {
stopAndDelete(serverAp, innerAPValue, actionCodeHash)
return
}
Debug("Stopping request for hash '%s' skipped, as new actions joined.", actionCodeHash)
}