in lambda/rapidcore/server.go [312:365]
func (s *Server) sendResponseUnsafe(invokeID string, contentType string, status int, payload io.Reader) error {
if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID {
return interop.ErrInvalidInvokeID
}
if s.invokeCtx.ReplySent {
return interop.ErrResponseSent
}
if s.invokeCtx.ReplyStream == nil {
return fmt.Errorf("ReplyStream not available")
}
// TODO: earlier, status was set to 500 if runtime called /invocation/error. However, the integration
// tests do not differentiate between /invocation/error and /invocation/response, but they check the error type:
// To identify user-errors, we should also allowlist custom errortypes and propagate them via headers.
// s.invokeCtx.ReplyStream.WriteHeader(status)
var reportedErr error
if s.invokeCtx.Direct {
if err := directinvoke.SendDirectInvokeResponse(map[string]string{"Content-Type": contentType}, payload, s.invokeCtx.ReplyStream); err != nil {
// TODO: Do we need to drain the reader in case of a large payload and connection reuse?
log.Errorf("Failed to write response to %s: %s", invokeID, err)
flusher, ok := s.invokeCtx.ReplyStream.(http.Flusher)
if !ok {
log.Error("Failed to flush response")
}
flusher.Flush()
reportedErr = err
}
} else {
data, err := ioutil.ReadAll(payload)
if err != nil {
return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
}
if len(data) > interop.MaxPayloadSize {
return &interop.ErrorResponseTooLarge{
ResponseSize: len(data),
MaxResponseSize: interop.MaxPayloadSize,
}
}
s.invokeCtx.ReplyStream.Header().Add("Content-Type", contentType)
if _, err := s.invokeCtx.ReplyStream.Write(data); err != nil {
return fmt.Errorf("Failed to write response to %s: %s", invokeID, err)
}
}
s.sendResponseChan <- struct{}{}
s.invokeCtx.ReplySent = true
s.invokeCtx.Direct = false
return reportedErr
}