in lambda/rapid/start.go [262:359]
func doInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Watchdog, invokeRequest *interop.Invoke, mx *invokeMetrics) error {
execCtx.eventsAPI.SetCurrentRequestID(invokeRequest.ID)
appCtx := execCtx.appCtx
appctx.StoreErrorResponse(appCtx, nil)
if invokeRequest.NeedDebugLogs {
execCtx.debugTailLogger.Enable()
} else {
execCtx.debugTailLogger.Disable()
}
xray := execCtx.xray
xray.Configure(invokeRequest)
return xray.CaptureInvokeSegment(ctx, xray.WithErrorCause(ctx, appCtx, func(ctx context.Context) error {
if !execCtx.initDone {
// do inline init
if err := xray.CaptureInitSubsegment(ctx, func(ctx context.Context) error {
return doInit(ctx, execCtx, watchdog)
}); err != nil {
return err
}
} else if execCtx.startRequest.SandboxType != interop.SandboxPreWarmed {
xray.SendInitSubsegmentWithRecordedTimesOnce(ctx)
}
invokeFlow := execCtx.invokeFlow
log.Debug("Initialize invoke flow barriers")
err := invokeFlow.InitializeBarriers()
if err != nil {
return err
}
registrationService := execCtx.registrationService
runtime := registrationService.GetRuntime()
var intAgents []*core.InternalAgent
var extAgents []*core.ExternalAgent
if extensions.AreEnabled() {
intAgents = registrationService.GetSubscribedInternalAgents(core.InvokeEvent)
extAgents = registrationService.GetSubscribedExternalAgents(core.InvokeEvent)
if err := invokeFlow.SetAgentsReadyCount(uint16(len(intAgents) + len(extAgents))); err != nil {
return err
}
}
// Invoke
if err := xray.CaptureInvokeSubsegment(ctx, xray.WithError(ctx, appCtx, func(ctx context.Context) error {
log.Debug("Set renderer for invoke")
renderer := rendering.NewInvokeRenderer(ctx, invokeRequest, xray.TracingHeaderParser())
defer func() {
mx.rendererMetrics = renderer.GetMetrics()
}()
execCtx.renderingService.SetRenderer(renderer)
if extensions.AreEnabled() {
log.Debug("Release agents conditions")
for _, agent := range extAgents {
agent.Release()
}
for _, agent := range intAgents {
agent.Release()
}
}
log.Debug("Release runtime condition")
runtime.Release()
log.Debug("Await runtime response")
return invokeFlow.AwaitRuntimeResponse()
})); err != nil {
return err
}
// Runtime overhead
if err := xray.CaptureOverheadSubsegment(ctx, func(ctx context.Context) error {
log.Debug("Await runtime ready")
return invokeFlow.AwaitRuntimeReady()
}); err != nil {
return err
}
mx.runtimeReadyTime = metering.Monotime()
if err := execCtx.eventsAPI.SendRuntimeDone("success"); err != nil {
log.Errorf("Failed to send RUNDONE: %s", err)
}
// Extensions overhead
if execCtx.HasActiveExtensions() {
execCtx.interopServer.SendRuntimeReady()
log.Debug("Await agents ready")
if err := invokeFlow.AwaitAgentsReady(); err != nil {
log.Warnf("AwaitAgentsReady() = %s", err)
return err
}
}
return nil
}))
}