lambda/rapidcore/sandbox_api.go (117 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package rapidcore
import (
"bytes"
"go.amzn.com/lambda/extensions"
"go.amzn.com/lambda/interop"
)
// SandboxContext and other structs form the implementation of the SandboxAPI
// interface defined in interop/sandbox_model.go, using the implementation of
// Init, Invoke and Reset handlers in rapid/sandbox.go
type SandboxContext struct {
rapidCtx interop.RapidContext
handler string
runtimeAPIAddress string
}
// initContext and its methods model the initialization lifecycle
// of the Sandbox, which persist across invocations
type initContext struct {
initSuccessChan chan interop.InitSuccess
initFailureChan chan interop.InitFailure
rapidCtx interop.RapidContext
sbInfoFromInit interop.SandboxInfoFromInit // contains data that needs to be persisted from init for suppressed inits during invoke
invokeRequestBuffer *bytes.Buffer // byte buffer used to store the invoke request rendered to runtime (reused until reset)
}
// invokeContext and its methods model the invocation lifecycle
type invokeContext struct {
rapidCtx interop.RapidContext
invokeRequestChan chan *interop.Invoke
invokeSuccessChan chan interop.InvokeSuccess
invokeFailureChan chan interop.InvokeFailure
sbInfoFromInit interop.SandboxInfoFromInit // contains data that needs to be persisted from init for suppressed inits during invoke
invokeRequestBuffer *bytes.Buffer // byte buffer used to store the invoke request rendered to runtime (reused until reset)
}
// Validate interface compliance
var _ interop.SandboxContext = (*SandboxContext)(nil)
var _ interop.InitContext = (*initContext)(nil)
var _ interop.InvokeContext = (*invokeContext)(nil)
// Init starts the runtime domain initialization in a separate goroutine.
// Return value indicates that init request has been accepted and started.
func (s SandboxContext) Init(init *interop.Init, timeoutMs int64) interop.InitContext {
initSuccessResponseChan := make(chan interop.InitSuccess)
initFailureResponseChan := make(chan interop.InitFailure)
if len(s.handler) > 0 {
init.EnvironmentVariables.SetHandler(s.handler)
}
init.EnvironmentVariables.StoreRuntimeAPIEnvironmentVariable(s.runtimeAPIAddress)
extensions.DisableViaMagicLayer()
// We start initialization handling in a separate goroutine so that control can be returned back to
// caller, which can do work (e.g. notifying further upstream that initialization has started), and
// and call initCtx.Wait() to wait async for completion of initialization phase.
go s.rapidCtx.HandleInit(init, initSuccessResponseChan, initFailureResponseChan)
sbMetadata := interop.SandboxInfoFromInit{
EnvironmentVariables: init.EnvironmentVariables,
SandboxType: init.SandboxType,
RuntimeBootstrap: init.Bootstrap,
}
return newInitContext(s.rapidCtx, sbMetadata, initSuccessResponseChan, initFailureResponseChan)
}
// Reset triggers a reset. In case of timeouts, the reset handler cancels all flows which triggers
// ongoing invoke handlers to return before proceeding with invoke
// TODO: move this method to the initialization context, since reset is conceptually on RT domain
func (s SandboxContext) Reset(reset *interop.Reset) (interop.ResetSuccess, *interop.ResetFailure) {
defer s.rapidCtx.Clear()
return s.rapidCtx.HandleReset(reset)
}
// Reset triggers a shutdown. This is similar to a reset, except that this is a terminal state
// and no further invokes are allowed
func (s SandboxContext) Shutdown(shutdown *interop.Shutdown) interop.ShutdownSuccess {
return s.rapidCtx.HandleShutdown(shutdown)
}
func (s SandboxContext) Restore(restore *interop.Restore) (interop.RestoreResult, error) {
return s.rapidCtx.HandleRestore(restore)
}
func (s *SandboxContext) SetRuntimeStartedTime(runtimeStartedTime int64) {
s.rapidCtx.SetRuntimeStartedTime(runtimeStartedTime)
}
func (s *SandboxContext) SetInvokeResponseMetrics(metrics *interop.InvokeResponseMetrics) {
s.rapidCtx.SetInvokeResponseMetrics(metrics)
}
func newInitContext(r interop.RapidContext, sbMetadata interop.SandboxInfoFromInit,
initSuccessChan chan interop.InitSuccess, initFailureChan chan interop.InitFailure) initContext {
// Invocation request buffer is initialized once per initialization
// to reduce memory usage & GC CPU time across invocations
var requestBuffer bytes.Buffer
return initContext{
initSuccessChan: initSuccessChan,
initFailureChan: initFailureChan,
rapidCtx: r,
sbInfoFromInit: sbMetadata,
invokeRequestBuffer: &requestBuffer,
}
}
// Wait awaits until initialization phase is complete, i.e. one of:
// - until all runtime domain process call /next
// - any one of the runtime domain processes exit (init failure)
// Timeout handling is managed upstream entirely
func (i initContext) Wait() (interop.InitSuccess, *interop.InitFailure) {
select {
case initSuccess, isOpen := <-i.initSuccessChan:
if !isOpen {
// If init has already suceeded, we return quickly
return interop.InitSuccess{}, nil
}
return initSuccess, nil
case initFailure, isOpen := <-i.initFailureChan:
if !isOpen {
// If init has already failed, we return quickly for init to be suppressed
return interop.InitSuccess{}, &initFailure
}
return interop.InitSuccess{}, &initFailure
}
}
// Reserve is used to initialize invoke-related state
func (i initContext) Reserve() interop.InvokeContext {
invokeRequestChan := make(chan *interop.Invoke)
invokeSuccessChan := make(chan interop.InvokeSuccess)
invokeFailureChan := make(chan interop.InvokeFailure)
return invokeContext{
rapidCtx: i.rapidCtx,
invokeRequestChan: invokeRequestChan,
invokeSuccessChan: invokeSuccessChan,
invokeFailureChan: invokeFailureChan,
sbInfoFromInit: i.sbInfoFromInit,
invokeRequestBuffer: i.invokeRequestBuffer,
}
}
// SendRequest starts the invocation request handling in a separate goroutine,
// i.e. sending the request payload via /next response,
// and waiting for the synchronization points
func (invCtx invokeContext) SendRequest(invoke *interop.Invoke, responseSender interop.InvokeResponseSender) {
// Invoke handling needs to be in a separate goroutine so that control can
// be returned immediately to calling goroutine, which can do work and
// asynchronously call invCtx.Wait() to await completion of the invoke phase
go func() {
// For suppressed inits, invoke needs the runtime and agent env vars
invokeSuccess, invokeFailure := invCtx.rapidCtx.HandleInvoke(invoke, invCtx.sbInfoFromInit, invCtx.invokeRequestBuffer, responseSender)
if invokeFailure != nil {
invCtx.invokeFailureChan <- *invokeFailure
} else {
invCtx.invokeSuccessChan <- invokeSuccess
}
}()
}
// Wait awaits invoke completion, i.e. one of the following cases:
// - until all runtime domain process call /next
// - until a process exit (that notifies upstream to trigger a reset due to "failure")
// - until a timeout (triggered by a reset from upstream due to "timeout")
func (invCtx invokeContext) Wait() (interop.InvokeSuccess, *interop.InvokeFailure) {
select {
case invokeSuccess := <-invCtx.invokeSuccessChan:
return invokeSuccess, nil
case invokeFailure := <-invCtx.invokeFailureChan:
return interop.InvokeSuccess{}, &invokeFailure
}
}