func()

in lambda/rapidcore/server.go [312:365]


func (s *Server) sendResponseUnsafe(invokeID string, contentType string, status int, payload io.Reader) error {
	if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID {
		return interop.ErrInvalidInvokeID
	}

	if s.invokeCtx.ReplySent {
		return interop.ErrResponseSent
	}

	if s.invokeCtx.ReplyStream == nil {
		return fmt.Errorf("ReplyStream not available")
	}

	// TODO: earlier, status was set to 500 if runtime called /invocation/error. However, the integration
	// tests do not differentiate between /invocation/error and /invocation/response, but they check the error type:
	// To identify user-errors, we should also allowlist custom errortypes and propagate them via headers.

	// s.invokeCtx.ReplyStream.WriteHeader(status)

	var reportedErr error
	if s.invokeCtx.Direct {
		if err := directinvoke.SendDirectInvokeResponse(map[string]string{"Content-Type": contentType}, payload, s.invokeCtx.ReplyStream); err != nil {
			// TODO: Do we need to drain the reader in case of a large payload and connection reuse?
			log.Errorf("Failed to write response to %s: %s", invokeID, err)
			flusher, ok := s.invokeCtx.ReplyStream.(http.Flusher)
			if !ok {
				log.Error("Failed to flush response")
			}
			flusher.Flush()
			reportedErr = err
		}
	} else {
		data, err := ioutil.ReadAll(payload)
		if err != nil {
			return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
		}
		if len(data) > interop.MaxPayloadSize {
			return &interop.ErrorResponseTooLarge{
				ResponseSize:    len(data),
				MaxResponseSize: interop.MaxPayloadSize,
			}
		}

		s.invokeCtx.ReplyStream.Header().Add("Content-Type", contentType)
		if _, err := s.invokeCtx.ReplyStream.Write(data); err != nil {
			return fmt.Errorf("Failed to write response to %s: %s", invokeID, err)
		}
	}

	s.sendResponseChan <- struct{}{}
	s.invokeCtx.ReplySent = true
	s.invokeCtx.Direct = false
	return reportedErr
}