func()

in lambda/rapidcore/server.go [291:349]


func (s *Server) sendResponseUnsafe(invokeID string, additionalHeaders map[string]string, payload io.Reader, trailers http.Header, request *interop.CancellableRequest, runtimeCalledResponse bool) error {
	if s.invokeCtx == nil || invokeID != s.invokeCtx.Token.InvokeID {
		return interop.ErrInvalidInvokeID
	}

	if s.invokeCtx.ReplySent {
		return interop.ErrResponseSent
	}

	if s.invokeCtx.ReplyStream == nil {
		return fmt.Errorf("ReplyStream not available")
	}

	var reportedErr error
	if s.invokeCtx.Direct {
		if err := directinvoke.SendDirectInvokeResponse(additionalHeaders, payload, trailers, s.invokeCtx.ReplyStream, s.interruptedResponseChan, s.sendResponseChan, request, runtimeCalledResponse, invokeID); err != nil {
			// TODO: Do we need to drain the reader in case of a large payload and connection reuse?
			log.Errorf("Failed to write response to %s: %s", invokeID, err)
			reportedErr = err
		}
	} else {
		data, err := io.ReadAll(payload)
		if err != nil {
			return fmt.Errorf("Failed to read response on %s: %s", invokeID, err)
		}
		if len(data) > interop.MaxPayloadSize {
			return &interop.ErrorResponseTooLarge{
				ResponseSize:    len(data),
				MaxResponseSize: interop.MaxPayloadSize,
			}
		}

		startReadingResponseMonoTimeMs := metering.Monotime()
		s.invokeCtx.ReplyStream.Header().Add(directinvoke.ContentTypeHeader, additionalHeaders[directinvoke.ContentTypeHeader])
		written, err := s.invokeCtx.ReplyStream.Write(data)
		if err != nil {
			return fmt.Errorf("Failed to write response to %s: %s", invokeID, err)
		}

		s.sendResponseChan <- &interop.InvokeResponseMetrics{
			ProducedBytes:                   int64(written),
			StartReadingResponseMonoTimeMs:  startReadingResponseMonoTimeMs,
			FinishReadingResponseMonoTimeMs: metering.Monotime(),
			TimeShapedNs:                    int64(-1),
			OutboundThroughputBps:           int64(-1),
			// FIXME:
			// The runtime tells whether the function response mode is streaming or not.
			// Ideally, we would want to use that value here. Since I'm just rebasing, I will leave
			// as-is, but we should use that instead of relying on our memory to set this here
			// because we "know" it's a streaming code path.
			FunctionResponseMode:  interop.FunctionResponseModeBuffered,
			RuntimeCalledResponse: runtimeCalledResponse,
		}
	}

	s.invokeCtx.ReplySent = true
	s.invokeCtx.Direct = false
	return reportedErr
}