func handleInvoke()

in lambda/rapid/handlers.go [638:691]


func handleInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer, responseSender interop.InvokeResponseSender) (interop.InvokeSuccess, *interop.InvokeFailure) {
	appctx.StoreResponseSender(execCtx.appCtx, responseSender)
	invokeMx := invokeMetrics{}

	if err := doInvoke(execCtx, invokeRequest, &invokeMx, sbInfoFromInit, requestBuffer); err != nil {
		log.WithError(err).WithField("InvokeID", invokeRequest.ID).Error("Invoke failed")
		invokeFailure := handleInvokeError(execCtx, invokeRequest, &invokeMx, err)
		invokeFailure.InvokeResponseMode = invokeRequest.InvokeResponseMode

		if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) {
			invokeFailure.ResponseMetrics = interop.ResponseMetrics{
				RuntimeResponseLatencyMs:     telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs),
				RuntimeTimeThrottledMs:       invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond),
				RuntimeProducedBytes:         invokeRequest.InvokeResponseMetrics.ProducedBytes,
				RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps,
			}
		}
		return interop.InvokeSuccess{}, invokeFailure
	}

	var invokeCompletionTimeNs int64
	if responseTimeNs := execCtx.registrationService.GetRuntime().GetRuntimeDescription().State.ResponseTimeNs; responseTimeNs != 0 {
		invokeCompletionTimeNs = time.Now().UnixNano() - responseTimeNs
	}

	invokeSuccessMsg := interop.InvokeSuccess{
		RuntimeRelease:      appctx.GetRuntimeRelease(execCtx.appCtx),
		NumActiveExtensions: execCtx.registrationService.CountAgents(),
		ExtensionNames:      execCtx.GetExtensionNames(),
		InvokeMetrics: interop.InvokeMetrics{
			InvokeRequestReadTimeNs: invokeMx.rendererMetrics.ReadTime.Nanoseconds(),
			InvokeRequestSizeBytes:  int64(invokeMx.rendererMetrics.SizeBytes),
			RuntimeReadyTime:        invokeMx.runtimeReadyTime,
		},
		InvokeCompletionTimeNs: invokeCompletionTimeNs,
		InvokeReceivedTime:     invokeRequest.InvokeReceivedTime,
		InvokeResponseMode:     invokeRequest.InvokeResponseMode,
	}

	if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) {
		invokeSuccessMsg.ResponseMetrics = interop.ResponseMetrics{
			RuntimeResponseLatencyMs:     telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs),
			RuntimeTimeThrottledMs:       invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond),
			RuntimeProducedBytes:         invokeRequest.InvokeResponseMetrics.ProducedBytes,
			RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps,
		}
	}

	if execCtx.telemetryAPIEnabled {
		invokeSuccessMsg.LogsAPIMetrics = interop.MergeSubscriptionMetrics(execCtx.logsSubscriptionAPI.FlushMetrics(), execCtx.telemetrySubscriptionAPI.FlushMetrics())
	}

	return invokeSuccessMsg, nil
}