lambda/rapidcore/sandbox_builder.go (172 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package rapidcore
import (
"context"
"io"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"go.amzn.com/lambda/extensions"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/logging"
"go.amzn.com/lambda/rapid"
"go.amzn.com/lambda/supervisor"
supvmodel "go.amzn.com/lambda/supervisor/model"
"go.amzn.com/lambda/telemetry"
log "github.com/sirupsen/logrus"
)
const (
defaultSigtermResetTimeoutMs = int64(2000)
)
type SandboxBuilder struct {
sandbox *rapid.Sandbox
sandboxContext interop.SandboxContext
lambdaInvokeAPI LambdaInvokeAPI
defaultInteropServer *Server
useCustomInteropServer bool
shutdownFuncs []func()
handler string
}
type logSink int
const (
RuntimeLogSink logSink = iota
ExtensionLogSink
)
func NewSandboxBuilder() *SandboxBuilder {
defaultInteropServer := NewServer()
localSv := supervisor.NewLocalSupervisor()
b := &SandboxBuilder{
sandbox: &rapid.Sandbox{
StandaloneMode: true,
LogsEgressAPI: &telemetry.NoOpLogsEgressAPI{},
EnableTelemetryAPI: false,
Tracer: telemetry.NewNoOpTracer(),
EventsAPI: &telemetry.NoOpEventsAPI{},
InitCachingEnabled: false,
Supervisor: localSv,
RuntimeFsRootPath: localSv.RootPath,
RuntimeAPIHost: "127.0.0.1",
RuntimeAPIPort: 9001,
},
defaultInteropServer: defaultInteropServer,
shutdownFuncs: []func(){},
lambdaInvokeAPI: NewEmulatorAPI(defaultInteropServer),
}
b.AddShutdownFunc(func() {
log.Info("Shutting down...")
defaultInteropServer.Reset("SandboxTerminated", defaultSigtermResetTimeoutMs)
})
return b
}
func (b *SandboxBuilder) SetSupervisor(supervisor supvmodel.ProcessSupervisor) *SandboxBuilder {
b.sandbox.Supervisor = supervisor
return b
}
func (b *SandboxBuilder) SetRuntimeFsRootPath(rootPath string) *SandboxBuilder {
b.sandbox.RuntimeFsRootPath = rootPath
return b
}
func (b *SandboxBuilder) SetRuntimeAPIAddress(runtimeAPIAddress string) *SandboxBuilder {
host, port, err := net.SplitHostPort(runtimeAPIAddress)
if err != nil {
log.WithError(err).Warnf("Failed to parse RuntimeApiAddress: %s:", runtimeAPIAddress)
return b
}
portInt, err := strconv.Atoi(port)
if err != nil {
log.WithError(err).Warnf("Failed to parse RuntimeApiPort: %s:", port)
return b
}
b.sandbox.RuntimeAPIHost = host
b.sandbox.RuntimeAPIPort = portInt
return b
}
func (b *SandboxBuilder) SetInteropServer(interopServer interop.Server) *SandboxBuilder {
b.sandbox.InteropServer = interopServer
b.useCustomInteropServer = true
return b
}
func (b *SandboxBuilder) SetEventsAPI(eventsAPI interop.EventsAPI) *SandboxBuilder {
b.sandbox.EventsAPI = eventsAPI
return b
}
func (b *SandboxBuilder) SetTracer(tracer telemetry.Tracer) *SandboxBuilder {
b.sandbox.Tracer = tracer
return b
}
func (b *SandboxBuilder) DisableStandaloneMode() *SandboxBuilder {
b.sandbox.StandaloneMode = false
return b
}
func (b *SandboxBuilder) SetExtensionsFlag(extensionsEnabled bool) *SandboxBuilder {
if extensionsEnabled {
extensions.Enable()
} else {
extensions.Disable()
}
return b
}
func (b *SandboxBuilder) SetInitCachingFlag(initCachingEnabled bool) *SandboxBuilder {
b.sandbox.InitCachingEnabled = initCachingEnabled
return b
}
func (b *SandboxBuilder) SetTelemetrySubscription(logsSubscriptionAPI telemetry.SubscriptionAPI, telemetrySubscriptionAPI telemetry.SubscriptionAPI) *SandboxBuilder {
b.sandbox.EnableTelemetryAPI = true
b.sandbox.LogsSubscriptionAPI = logsSubscriptionAPI
b.sandbox.TelemetrySubscriptionAPI = telemetrySubscriptionAPI
return b
}
func (b *SandboxBuilder) SetLogsEgressAPI(logsEgressAPI telemetry.StdLogsEgressAPI) *SandboxBuilder {
b.sandbox.LogsEgressAPI = logsEgressAPI
return b
}
func (b *SandboxBuilder) SetHandler(handler string) *SandboxBuilder {
b.handler = handler
return b
}
func (b *SandboxBuilder) AddShutdownFunc(shutdownFunc func()) *SandboxBuilder {
b.shutdownFuncs = append(b.shutdownFuncs, shutdownFunc)
return b
}
func (b *SandboxBuilder) Create() (interop.SandboxContext, interop.InternalStateGetter) {
if !b.useCustomInteropServer {
b.sandbox.InteropServer = b.defaultInteropServer
}
ctx, cancel := context.WithCancel(context.Background())
// cancel is called when handling termination signals as a cancellation
// signal to the Runtime API sever to terminate gracefully
go signalHandler(cancel, b.shutdownFuncs)
// rapid.Start, among other things, starts the Runtime API server and
// terminates it gracefully if the cxt is canceled
rapidCtx, internalStateFn, runtimeAPIAddr := rapid.Start(ctx, b.sandbox)
b.sandboxContext = &SandboxContext{
rapidCtx: rapidCtx,
handler: b.handler,
runtimeAPIAddress: runtimeAPIAddr,
}
return b.sandboxContext, internalStateFn
}
func (b *SandboxBuilder) DefaultInteropServer() *Server {
return b.defaultInteropServer
}
func (b *SandboxBuilder) LambdaInvokeAPI() LambdaInvokeAPI {
return b.lambdaInvokeAPI
}
// SetLogLevel sets the log level for internal logging. Needs to be called very
// early during startup to configure logs emitted during initialization
func SetLogLevel(logLevel string) {
level, err := log.ParseLevel(logLevel)
if err != nil {
log.WithError(err).Fatal("Failed to set log level. Valid log levels are:", log.AllLevels)
}
log.SetLevel(level)
log.SetFormatter(&logging.InternalFormatter{})
}
func SetInternalLogOutput(w io.Writer) {
logging.SetOutput(w)
}
// Trap SIGINT and SIGTERM signals, call shutdown function, and cancel the
// ctx to terminate gracefully the Runtime API server
func signalHandler(cancel context.CancelFunc, shutdownFuncs []func()) {
defer cancel()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
sigReceived := <-sig
log.WithField("signal", sigReceived.String()).Info("Received signal")
for _, shutdownFunc := range shutdownFuncs {
shutdownFunc()
}
}