in lambda/rapid/handlers.go [638:691]
func handleInvoke(execCtx *rapidContext, invokeRequest *interop.Invoke, sbInfoFromInit interop.SandboxInfoFromInit, requestBuffer *bytes.Buffer, responseSender interop.InvokeResponseSender) (interop.InvokeSuccess, *interop.InvokeFailure) {
appctx.StoreResponseSender(execCtx.appCtx, responseSender)
invokeMx := invokeMetrics{}
if err := doInvoke(execCtx, invokeRequest, &invokeMx, sbInfoFromInit, requestBuffer); err != nil {
log.WithError(err).WithField("InvokeID", invokeRequest.ID).Error("Invoke failed")
invokeFailure := handleInvokeError(execCtx, invokeRequest, &invokeMx, err)
invokeFailure.InvokeResponseMode = invokeRequest.InvokeResponseMode
if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) {
invokeFailure.ResponseMetrics = interop.ResponseMetrics{
RuntimeResponseLatencyMs: telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs),
RuntimeTimeThrottledMs: invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond),
RuntimeProducedBytes: invokeRequest.InvokeResponseMetrics.ProducedBytes,
RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps,
}
}
return interop.InvokeSuccess{}, invokeFailure
}
var invokeCompletionTimeNs int64
if responseTimeNs := execCtx.registrationService.GetRuntime().GetRuntimeDescription().State.ResponseTimeNs; responseTimeNs != 0 {
invokeCompletionTimeNs = time.Now().UnixNano() - responseTimeNs
}
invokeSuccessMsg := interop.InvokeSuccess{
RuntimeRelease: appctx.GetRuntimeRelease(execCtx.appCtx),
NumActiveExtensions: execCtx.registrationService.CountAgents(),
ExtensionNames: execCtx.GetExtensionNames(),
InvokeMetrics: interop.InvokeMetrics{
InvokeRequestReadTimeNs: invokeMx.rendererMetrics.ReadTime.Nanoseconds(),
InvokeRequestSizeBytes: int64(invokeMx.rendererMetrics.SizeBytes),
RuntimeReadyTime: invokeMx.runtimeReadyTime,
},
InvokeCompletionTimeNs: invokeCompletionTimeNs,
InvokeReceivedTime: invokeRequest.InvokeReceivedTime,
InvokeResponseMode: invokeRequest.InvokeResponseMode,
}
if invokeRequest.InvokeResponseMetrics != nil && interop.IsResponseStreamingMetrics(invokeRequest.InvokeResponseMetrics) {
invokeSuccessMsg.ResponseMetrics = interop.ResponseMetrics{
RuntimeResponseLatencyMs: telemetry.CalculateDuration(execCtx.RuntimeStartedTime, invokeRequest.InvokeResponseMetrics.StartReadingResponseMonoTimeMs),
RuntimeTimeThrottledMs: invokeRequest.InvokeResponseMetrics.TimeShapedNs / int64(time.Millisecond),
RuntimeProducedBytes: invokeRequest.InvokeResponseMetrics.ProducedBytes,
RuntimeOutboundThroughputBps: invokeRequest.InvokeResponseMetrics.OutboundThroughputBps,
}
}
if execCtx.telemetryAPIEnabled {
invokeSuccessMsg.LogsAPIMetrics = interop.MergeSubscriptionMetrics(execCtx.logsSubscriptionAPI.FlushMetrics(), execCtx.telemetrySubscriptionAPI.FlushMetrics())
}
return invokeSuccessMsg, nil
}