func doInvoke()

in lambda/rapid/handlers.go [406:532]


func doInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, mx *invokeMetrics, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer) error {
	execCtx.eventsAPI.SetCurrentRequestID(interop.RequestID(invokeRequest.ID))
	appCtx := execCtx.appCtx

	xray := execCtx.xray
	xray.Configure(invokeRequest)

	ctx := context.Background()

	return xray.CaptureInvokeSegment(ctx, xray.WithErrorCause(ctx, appCtx, func(ctx context.Context) error {
		telemetryTracingCtx := xray.BuildTracingCtxForStart()

		if !execCtx.initDone {
			// do inline init
			if err := xray.CaptureInitSubsegment(ctx, func(ctx context.Context) error {
				return doRuntimeDomainInit(execCtx, sbInfoFromInit, interop.LifecyclePhaseInvoke)
			}); err != nil {
				sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx)
				return err
			}
		} else if sbInfoFromInit.SandboxType != interop.SandboxPreWarmed && !execCtx.initCachingEnabled {
			xray.SendInitSubsegmentWithRecordedTimesOnce(ctx)
		}

		xray.SendRestoreSubsegmentWithRecordedTimesOnce(ctx)

		sendInvokeStartLogEvent(execCtx, invokeRequest.ID, telemetryTracingCtx)

		invokeFlow := execCtx.invokeFlow
		log.Debug("Initialize invoke flow barriers")
		err := invokeFlow.InitializeBarriers()
		if err != nil {
			return err
		}

		registrationService := execCtx.registrationService
		runtime := registrationService.GetRuntime()
		var intAgents []*core.InternalAgent
		var extAgents []*core.ExternalAgent

		if extensions.AreEnabled() {
			intAgents = registrationService.GetSubscribedInternalAgents(core.InvokeEvent)
			extAgents = registrationService.GetSubscribedExternalAgents(core.InvokeEvent)
			if err := invokeFlow.SetAgentsReadyCount(uint16(len(intAgents) + len(extAgents))); err != nil {
				return err
			}
		}

		// Invoke
		if err := xray.CaptureInvokeSubsegment(ctx, xray.WithError(ctx, appCtx, func(ctx context.Context) error {
			log.Debug("Set renderer for invoke")
			renderer := rendering.NewInvokeRenderer(ctx, invokeRequest, requestBuffer, xray.BuildTracingHeader())
			defer func() {
				mx.rendererMetrics = renderer.GetMetrics()
			}()

			execCtx.renderingService.SetRenderer(renderer)
			if extensions.AreEnabled() {
				log.Debug("Release agents conditions")
				for _, agent := range extAgents {
					//TODO handle Supervisors listening channel
					agent.Release()
				}
				for _, agent := range intAgents {
					//TODO handle Supervisors listening channel
					agent.Release()
				}
			}

			log.Debug("Release runtime condition")
			//TODO handle Supervisors listening channel
			execCtx.SetRuntimeStartedTime(metering.Monotime())
			runtime.Release()
			log.Debug("Await runtime response")
			//TODO handle Supervisors listening channel
			return invokeFlow.AwaitRuntimeResponse()
		})); err != nil {
			return err
		}

		// Runtime overhead
		if err := xray.CaptureOverheadSubsegment(ctx, func(ctx context.Context) error {
			log.Debug("Await runtime ready")
			execCtx.SetRuntimeOverheadStartedTime(metering.Monotime())
			//TODO handle Supervisors listening channel
			return invokeFlow.AwaitRuntimeReady()
		}); err != nil {
			return err
		}
		mx.runtimeReadyTime = metering.Monotime()

		runtimeDoneEventData := interop.InvokeRuntimeDoneData{
			Status:          telemetry.RuntimeDoneSuccess,
			Metrics:         telemetry.GetRuntimeDoneInvokeMetrics(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, mx.runtimeReadyTime),
			InternalMetrics: invokeRequest.InvokeResponseMetrics,
			Tracing:         xray.BuildTracingCtxAfterInvokeComplete(),
			Spans:           execCtx.eventsAPI.GetRuntimeDoneSpans(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics, execCtx.RuntimeOverheadStartedTime, mx.runtimeReadyTime),
		}
		log.Info(runtimeDoneEventData.String())
		if err := execCtx.eventsAPI.SendInvokeRuntimeDone(runtimeDoneEventData); err != nil {
			log.Errorf("Failed to send INVOKE RTDONE: %s", err)
		}

		// Extensions overhead
		if execCtx.HasActiveExtensions() {
			extensionOverheadStartTime := metering.Monotime()
			execCtx.interopServer.SendRuntimeReady()
			log.Debug("Await agents ready")
			//TODO handle Supervisors listening channel
			if err := invokeFlow.AwaitAgentsReady(); err != nil {
				log.Warnf("AwaitAgentsReady() = %s", err)
				return err
			}
			extensionOverheadEndTime := metering.Monotime()
			extensionOverheadMsSpan := interop.Span{
				Name:       "extensionOverhead",
				Start:      telemetry.GetEpochTimeInISO8601FormatFromMonotime(extensionOverheadStartTime),
				DurationMs: telemetry.CalculateDuration(extensionOverheadStartTime, extensionOverheadEndTime),
			}
			if err := execCtx.eventsAPI.SendReportSpan(extensionOverheadMsSpan); err != nil {
				log.WithError(err).Error("Failed to create REPORT Span")
			}
		}

		return nil
	}))
}