func asyncPayloadCopy()

in lambda/core/directinvoke/directinvoke.go [239:284]


func asyncPayloadCopy(w http.ResponseWriter, payload io.Reader) (copyDone chan CopyDoneResult, cancel context.CancelFunc, err error) {
	copyDone = make(chan CopyDoneResult)
	streamedResponseWriter, cancel, err := NewStreamedResponseWriter(w)
	if err != nil {
		return nil, nil, &interop.ErrInternalPlatformError{}
	}

	go func() { // copy payload in a separate go routine
		// -1 size indicates the payload size is unlimited.
		isPayloadsSizeRestricted := MaxDirectResponseSize != -1

		if isPayloadsSizeRestricted {
			// Setting the limit to MaxDirectResponseSize + 1 so we can do
			// readBytes > MaxDirectResponseSize to check if the response is oversized.
			// As the response is allowed to be of the size MaxDirectResponseSize but not larger than it.
			payload = io.LimitReader(payload, MaxDirectResponseSize+1)
		}

		// FIXME: inject bandwidthlimiter as a dependency, so that we can mock it in tests
		copiedBytes, copyError := bandwidthlimiter.BandwidthLimitingCopy(streamedResponseWriter, payload)

		isPayloadsSizeOversized := copiedBytes > MaxDirectResponseSize

		if copyError != nil {
			w.Header().Set(EndOfResponseTrailer, EndOfResponseTruncated)
			copyError = &interop.ErrTruncatedResponse{}
		} else if isPayloadsSizeRestricted && isPayloadsSizeOversized {
			w.Header().Set(EndOfResponseTrailer, EndOfResponseOversized)
			copyError = &interop.ErrorResponseTooLargeDI{
				ErrorResponseTooLarge: interop.ErrorResponseTooLarge{
					ResponseSize:    int(copiedBytes),
					MaxResponseSize: int(MaxDirectResponseSize),
				},
			}
		} else {
			w.Header().Set(EndOfResponseTrailer, EndOfResponseComplete)
		}
		copyDoneResult := CopyDoneResult{
			Metrics: streamedResponseWriter.GetMetrics(),
			Error:   copyError,
		}
		copyDone <- copyDoneResult
		cancel() // free resources
	}()
	return
}