in lambda/rapid/handlers.go [406:532]
func doInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, mx *invokeMetrics, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer) error {
execCtx.eventsAPI.SetCurrentRequestID(interop.RequestID(invokeRequest.ID))
appCtx := execCtx.appCtx
xray := execCtx.xray
xray.Configure(invokeRequest)
ctx := context.Background()
return xray.CaptureInvokeSegment(ctx, xray.WithErrorCause(ctx, appCtx, func(ctx context.Context) error {
telemetryTracingCtx := xray.BuildTracingCtxForStart()
if !execCtx.initDone {
// do inline init
if err := xray.CaptureInitSubsegment(ctx, func(ctx context.Context) error {
return doRuntimeDomainInit(execCtx, sbInfoFromInit, interop.LifecyclePhaseInvoke)
}); err != nil {
sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx)
return err
}
} else if sbInfoFromInit.SandboxType != interop.SandboxPreWarmed && !execCtx.initCachingEnabled {
xray.SendInitSubsegmentWithRecordedTimesOnce(ctx)
}
xray.SendRestoreSubsegmentWithRecordedTimesOnce(ctx)
sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx)
invokeFlow := execCtx.invokeFlow
log.Debug("Initialize invoke flow barriers")
err := invokeFlow.InitializeBarriers()
if err != nil {
return err
}
registrationService := execCtx.registrationService
runtime := registrationService.GetRuntime()
var intAgents []*core.InternalAgent
var extAgents []*core.ExternalAgent
if extensions.AreEnabled() {
intAgents = registrationService.GetSubscribedInternalAgents(core.InvokeEvent)
extAgents = registrationService.GetSubscribedExternalAgents(core.InvokeEvent)
if err := invokeFlow.SetAgentsReadyCount(uint16(len(intAgents) + len(extAgents))); err != nil {
return err
}
}
// Invoke
if err := xray.CaptureInvokeSubsegment(ctx, xray.WithError(ctx, appCtx, func(ctx context.Context) error {
log.Debug("Set renderer for invoke")
renderer := rendering.NewInvokeRenderer(ctx, invokeRequest, requestBuffer, xray.BuildTracingHeader())
defer func() {
mx.rendererMetrics = renderer.GetMetrics()
}()
execCtx.renderingService.SetRenderer(renderer)
if extensions.AreEnabled() {
log.Debug("Release agents conditions")
for _, agent := range extAgents {
//TODO handle Supervisors listening channel
agent.Release()
}
for _, agent := range intAgents {
//TODO handle Supervisors listening channel
agent.Release()
}
}
log.Debug("Release runtime condition")
//TODO handle Supervisors listening channel
execCtx.SetRuntimeStartedTime(metering.Monotime())
runtime.Release()
log.Debug("Await runtime response")
//TODO handle Supervisors listening channel
return invokeFlow.AwaitRuntimeResponse()
})); err != nil {
return err
}
// Runtime overhead
if err := xray.CaptureOverheadSubsegment(ctx, func(ctx context.Context) error {
log.Debug("Await runtime ready")
execCtx.SetRuntimeOverheadStartedTime(metering.Monotime())
//TODO handle Supervisors listening channel
return invokeFlow.AwaitRuntimeReady()
}); err != nil {
return err
}
mx.runtimeReadyTime = metering.Monotime()
runtimeDoneEventData := interop.InvokeRuntimeDoneData{
Status: telemetry.RuntimeDoneSuccess,
Metrics: telemetry.GetRuntimeDoneInvokeMetrics(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, mx.runtimeReadyTime),
InternalMetrics: invokeRequest.InvokeResponseMetrics,
Tracing: xray.BuildTracingCtxAfterInvokeComplete(),
Spans: execCtx.eventsAPI.GetRuntimeDoneSpans(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, execCtx.RuntimeOverheadStartedTime, mx.runtimeReadyTime),
}
log.Info(runtimeDoneEventData.String())
if err := execCtx.eventsAPI.SendInvokeRuntimeDone(runtimeDoneEventData); err != nil {
log.Errorf("Failed to send INVOKE RTDONE: %s", err)
}
// Extensions overhead
if execCtx.HasActiveExtensions() {
extensionOverheadStartTime := metering.Monotime()
execCtx.interopServer.SendRuntimeReady()
log.Debug("Await agents ready")
//TODO handle Supervisors listening channel
if err := invokeFlow.AwaitAgentsReady(); err != nil {
log.Warnf("AwaitAgentsReady() = %s", err)
return err
}
extensionOverheadEndTime := metering.Monotime()
extensionOverheadMsSpan := interop.Span{
Name: "extensionOverhead",
Start: telemetry.GetEpochTimeInISO8601FormatFromMonotime(extensionOverheadStartTime),
DurationMs: telemetry.CalculateDuration(extensionOverheadStartTime, extensionOverheadEndTime),
}
if err := execCtx.eventsAPI.SendReportSpan(extensionOverheadMsSpan); err != nil {
log.WithError(err).Error("Failed to create REPORT Span")
}
}
return nil
}))
}