func doInvoke()

in lambda/rapid/start.go [262:359]


func doInvoke(ctx context.Context, execCtx *rapidContext, watchdog *core.Watchdog, invokeRequest *interop.Invoke, mx *invokeMetrics) error {
	execCtx.eventsAPI.SetCurrentRequestID(invokeRequest.ID)
	appCtx := execCtx.appCtx
	appctx.StoreErrorResponse(appCtx, nil)

	if invokeRequest.NeedDebugLogs {
		execCtx.debugTailLogger.Enable()
	} else {
		execCtx.debugTailLogger.Disable()
	}

	xray := execCtx.xray
	xray.Configure(invokeRequest)

	return xray.CaptureInvokeSegment(ctx, xray.WithErrorCause(ctx, appCtx, func(ctx context.Context) error {
		if !execCtx.initDone {
			// do inline init
			if err := xray.CaptureInitSubsegment(ctx, func(ctx context.Context) error {
				return doInit(ctx, execCtx, watchdog)
			}); err != nil {
				return err
			}
		} else if execCtx.startRequest.SandboxType != interop.SandboxPreWarmed {
			xray.SendInitSubsegmentWithRecordedTimesOnce(ctx)
		}

		invokeFlow := execCtx.invokeFlow
		log.Debug("Initialize invoke flow barriers")
		err := invokeFlow.InitializeBarriers()
		if err != nil {
			return err
		}

		registrationService := execCtx.registrationService
		runtime := registrationService.GetRuntime()
		var intAgents []*core.InternalAgent
		var extAgents []*core.ExternalAgent

		if extensions.AreEnabled() {
			intAgents = registrationService.GetSubscribedInternalAgents(core.InvokeEvent)
			extAgents = registrationService.GetSubscribedExternalAgents(core.InvokeEvent)
			if err := invokeFlow.SetAgentsReadyCount(uint16(len(intAgents) + len(extAgents))); err != nil {
				return err
			}
		}

		// Invoke
		if err := xray.CaptureInvokeSubsegment(ctx, xray.WithError(ctx, appCtx, func(ctx context.Context) error {
			log.Debug("Set renderer for invoke")
			renderer := rendering.NewInvokeRenderer(ctx, invokeRequest, xray.TracingHeaderParser())
			defer func() {
				mx.rendererMetrics = renderer.GetMetrics()
			}()

			execCtx.renderingService.SetRenderer(renderer)
			if extensions.AreEnabled() {
				log.Debug("Release agents conditions")
				for _, agent := range extAgents {
					agent.Release()
				}
				for _, agent := range intAgents {
					agent.Release()
				}
			}

			log.Debug("Release runtime condition")
			runtime.Release()
			log.Debug("Await runtime response")
			return invokeFlow.AwaitRuntimeResponse()
		})); err != nil {
			return err
		}

		// Runtime overhead
		if err := xray.CaptureOverheadSubsegment(ctx, func(ctx context.Context) error {
			log.Debug("Await runtime ready")
			return invokeFlow.AwaitRuntimeReady()
		}); err != nil {
			return err
		}
		mx.runtimeReadyTime = metering.Monotime()
		if err := execCtx.eventsAPI.SendRuntimeDone("success"); err != nil {
			log.Errorf("Failed to send RUNDONE: %s", err)
		}

		// Extensions overhead
		if execCtx.HasActiveExtensions() {
			execCtx.interopServer.SendRuntimeReady()
			log.Debug("Await agents ready")
			if err := invokeFlow.AwaitAgentsReady(); err != nil {
				log.Warnf("AwaitAgentsReady() = %s", err)
				return err
			}
		}

		return nil
	}))
}