in lambda/rapidcore/server.go [291:349]
func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[string]string, payload io.Reader, trailers http.Header, request *interop.CancellableRequest, runtimeCalledResponse bool) error {
if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID {
return interop.ErrInvalidInvokeID
}
if s.invokeCtx.ReplySent {
return interop.ErrResponseSent
}
if s.invokeCtx.ReplyStream == nil {
return fmt.Errorf("ReplyStream not available")
}
var reportedErr error
if s.invokeCtx.Direct {
if err := directinvoke.SendDirectInvokeResponse(additionalHeaders, payload, trailers, s.invokeCtx.ReplyStream, s.interruptedResponseChan, s.sendResponseChan, request, runtimeCalledResponse, invokeID); err != nil {
// TODO: Do we need to drain the reader in case of a large payload and connection reuse?
log.Errorf("Failed to write response to %s: %s", invokeID, err)
reportedErr = err
}
} else {
data, err := io.ReadAll(payload)
if err != nil {
return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
}
if len(data) > interop.MaxPayloadSize {
return &interop.ErrorResponseTooLarge{
ResponseSize: len(data),
MaxResponseSize: interop.MaxPayloadSize,
}
}
startReadingResponseMonoTimeMs := metering.Monotime()
s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, additionalHeaders[directinvoke.ContentTypeHeader])
written, err := s.invokeCtx.ReplyStream.Write(data)
if err != nil {
return fmt.Errorf("Failed to write response to %s: %s", invokeID, err)
}
s.sendResponseChan <- &interop.InvokeResponseMetrics{
ProducedBytes: int64(written),
StartReadingResponseMonoTimeMs: startReadingResponseMonoTimeMs,
FinishReadingResponseMonoTimeMs: metering.Monotime(),
TimeShapedNs: int64(-1),
OutboundThroughputBps: int64(-1),
// FIXME:
// The runtime tells whether the function response mode is streaming or not.
// Ideally, we would want to use that value here. Since I'm just rebasing, I will leave
// as-is, but we should use that instead of relying on our memory to set this here
// because we "know" it's a streaming code path.
FunctionResponseMode: interop.FunctionResponseModeBuffered,
RuntimeCalledResponse: runtimeCalledResponse,
}
}
s.invokeCtx.ReplySent = true
s.invokeCtx.Direct = false
return reportedErr
}