lambda/rapid/sandbox.go (119 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package rapid
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"go.amzn.com/lambda/appctx"
"go.amzn.com/lambda/core"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapi"
"go.amzn.com/lambda/rapi/rendering"
supvmodel "go.amzn.com/lambda/supervisor/model"
"go.amzn.com/lambda/telemetry"
)
type Sandbox struct {
EnableTelemetryAPI bool
StandaloneMode bool
InteropServer interop.Server
Tracer telemetry.Tracer
LogsSubscriptionAPI telemetry.SubscriptionAPI
TelemetrySubscriptionAPI telemetry.SubscriptionAPI
LogsEgressAPI telemetry.StdLogsEgressAPI
RuntimeStdoutWriter io.Writer
RuntimeStderrWriter io.Writer
Handler string
EventsAPI interop.EventsAPI
InitCachingEnabled bool
Supervisor supvmodel.ProcessSupervisor
RuntimeFsRootPath string // path to the root of the domain within the root mnt namespace. Reqired to find extensions
RuntimeAPIHost string
RuntimeAPIPort int
}
// Start pings Supervisor, and starts the Runtime API server. It allows the caller to configure:
// - Supervisor implementation: performs container construction & process management
// - Telemetry API and Logs API implementation: handling /logs and /telemetry of Runtime API
// - Events API implementation: handles platform log events emitted by Rapid (e.g. RuntimeDone, InitStart)
// - Logs Egress implementation: handling stdout/stderr logs from extension & runtime processes (TODO: remove & unify with Supervisor)
// - Tracer implementation: handling trace segments generate by platform (TODO: remove & unify with Events API)
// - InteropServer implementation: legacy interface for sending internal protocol messages, today only RuntimeReady remains (TODO: move RuntimeReady outside Core)
// - Feature flags:
// - StandaloneMode: indicates if being called by Rapid Core's standalone HTTP frontend (TODO: remove after unifying error reporting)
// - InitCachingEnabled: indicates if handlers must run Init Caching specific logic
// - TelemetryAPIEnabled: indicates if /telemetry and /logs endpoint HTTP handlers must be mounted
//
// - Contexts & Data:
// - ctx is used to gracefully terminate Runtime API HTTP Server on exit
func Start(ctx context.Context, s *Sandbox) (interop.RapidContext, interop.InternalStateGetter, string) {
// Initialize internal state objects required by Rapid handlers
appCtx := appctx.NewApplicationContext()
initFlow := core.NewInitFlowSynchronization()
invokeFlow := core.NewInvokeFlowSynchronization()
registrationService := core.NewRegistrationService(initFlow, invokeFlow)
renderingService := rendering.NewRenderingService()
credentialsService := core.NewCredentialsService()
appctx.StoreInitType(appCtx, s.InitCachingEnabled)
server := rapi.NewServer(s.RuntimeAPIHost, s.RuntimeAPIPort, appCtx, registrationService, renderingService, s.EnableTelemetryAPI, s.LogsSubscriptionAPI, s.TelemetrySubscriptionAPI, credentialsService)
runtimeAPIAddr := fmt.Sprintf("%s:%d", server.Host(), server.Port())
// TODO: pass this directly down to HTTP servers and handlers, instead of using
// global state to share the interop server implementation
appctx.StoreInteropServer(appCtx, s.InteropServer)
execCtx := &rapidContext{
// Internally initialized configurations
server: server,
appCtx: appCtx,
initDone: false,
initFlow: initFlow,
invokeFlow: invokeFlow,
registrationService: registrationService,
renderingService: renderingService,
credentialsService: credentialsService,
handlerExecutionMutex: sync.Mutex{},
shutdownContext: newShutdownContext(),
// Externally specified configurations (i.e. via SandboxBuilder)
telemetryAPIEnabled: s.EnableTelemetryAPI,
logsSubscriptionAPI: s.LogsSubscriptionAPI,
telemetrySubscriptionAPI: s.TelemetrySubscriptionAPI,
logsEgressAPI: s.LogsEgressAPI,
interopServer: s.InteropServer,
xray: s.Tracer,
standaloneMode: s.StandaloneMode,
eventsAPI: s.EventsAPI,
initCachingEnabled: s.InitCachingEnabled,
supervisor: processSupervisor{
ProcessSupervisor: s.Supervisor,
RootPath: s.RuntimeFsRootPath,
},
RuntimeStartedTime: -1,
RuntimeOverheadStartedTime: -1,
InvokeResponseMetrics: nil,
}
go startRuntimeAPI(ctx, execCtx)
return execCtx, registrationService.GetInternalStateDescriptor(appCtx), runtimeAPIAddr
}
func (r *rapidContext) HandleInit(init *interop.Init, initSuccessResponseChan chan<- interop.InitSuccess, initFailureResponseChan chan<- interop.InitFailure) {
r.handlerExecutionMutex.Lock()
defer r.handlerExecutionMutex.Unlock()
handleInit(r, init, initSuccessResponseChan, initFailureResponseChan)
}
func (r *rapidContext) HandleInvoke(invoke *interop.Invoke, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer, responseSender interop.InvokeResponseSender) (interop.InvokeSuccess, *interop.InvokeFailure) {
r.handlerExecutionMutex.Lock()
defer r.handlerExecutionMutex.Unlock()
// Clear the context used by the last invoke
r.appCtx.Delete(appctx.AppCtxInvokeErrorTraceDataKey)
return handleInvoke(r, invoke, sbInfoFromInit, requestBuffer, responseSender)
}
func (r *rapidContext) HandleReset(reset *interop.Reset) (interop.ResetSuccess, *interop.ResetFailure) {
// In the event of a Reset during init/invoke, CancelFlows cancels execution
// flows and return with the errResetReceived err - this error is special-cased
// and not handled by the init/invoke (unexpected) error handling functions
r.registrationService.CancelFlows(errResetReceived)
// Wait until invoke error handling has returned before continuing execution
r.handlerExecutionMutex.Lock()
defer r.handlerExecutionMutex.Unlock()
// Clear the context used by the last invoke
r.appCtx.Delete(appctx.AppCtxInvokeErrorTraceDataKey)
return handleReset(r, reset, r.RuntimeStartedTime, r.InvokeResponseMetrics)
}
func (r *rapidContext) HandleShutdown(shutdown *interop.Shutdown) interop.ShutdownSuccess {
// Wait until invoke error handling has returned before continuing execution
r.handlerExecutionMutex.Lock()
defer r.handlerExecutionMutex.Unlock()
// Shutdown doesn't cancel flows, so it can block forever
return handleShutdown(r, shutdown, standaloneShutdownReason)
}
func (r *rapidContext) HandleRestore(restore *interop.Restore) (interop.RestoreResult, error) {
return handleRestore(r, restore)
}
func (r *rapidContext) Clear() {
reinitialize(r)
}
func (r *rapidContext) SetRuntimeStartedTime(runtimeStartedTime int64) {
r.RuntimeStartedTime = runtimeStartedTime
}
func (r *rapidContext) SetRuntimeOverheadStartedTime(runtimeOverheadStartedTime int64) {
r.RuntimeOverheadStartedTime = runtimeOverheadStartedTime
}
func (r *rapidContext) SetInvokeResponseMetrics(metrics *interop.InvokeResponseMetrics) {
r.InvokeResponseMetrics = metrics
}
func (r *rapidContext) SetLogStreamName(logStreamName string) {
r.logStreamName = logStreamName
}
func (r *rapidContext) SetEventsAPI(eventsAPI interop.EventsAPI) {
r.eventsAPI = eventsAPI
}