in lambda/core/directinvoke/directinvoke.go [239:284]
func asyncPayloadCopy(w http.ResponseWriter, payload io.Reader) (copyDone chan CopyDoneResult, cancel context.CancelFunc, err error) {
copyDone = make(chan CopyDoneResult)
streamedResponseWriter, cancel, err := NewStreamedResponseWriter(w)
if err != nil {
return nil, nil, &interop.ErrInternalPlatformError{}
}
go func() { // copy payload in a separate go routine
// -1 size indicates the payload size is unlimited.
isPayloadsSizeRestricted := MaxDirectResponseSize != -1
if isPayloadsSizeRestricted {
// Setting the limit to MaxDirectResponseSize + 1 so we can do
// readBytes > MaxDirectResponseSize to check if the response is oversized.
// As the response is allowed to be of the size MaxDirectResponseSize but not larger than it.
payload = io.LimitReader(payload, MaxDirectResponseSize+1)
}
// FIXME: inject bandwidthlimiter as a dependency, so that we can mock it in tests
copiedBytes, copyError := bandwidthlimiter.BandwidthLimitingCopy(streamedResponseWriter, payload)
isPayloadsSizeOversized := copiedBytes > MaxDirectResponseSize
if copyError != nil {
w.Header().Set(EndOfResponseTrailer, EndOfResponseTruncated)
copyError = &interop.ErrTruncatedResponse{}
} else if isPayloadsSizeRestricted && isPayloadsSizeOversized {
w.Header().Set(EndOfResponseTrailer, EndOfResponseOversized)
copyError = &interop.ErrorResponseTooLargeDI{
ErrorResponseTooLarge: interop.ErrorResponseTooLarge{
ResponseSize: int(copiedBytes),
MaxResponseSize: int(MaxDirectResponseSize),
},
}
} else {
w.Header().Set(EndOfResponseTrailer, EndOfResponseComplete)
}
copyDoneResult := CopyDoneResult{
Metrics: streamedResponseWriter.GetMetrics(),
Error: copyError,
}
copyDone <- copyDoneResult
cancel() // free resources
}()
return
}