in lambda/core/directinvoke/directinvoke.go [286:341]
func sendStreamingInvokeResponse(payload io.Reader, trailers http.Header, w http.ResponseWriter,
interruptedResponseChan chan *interop.Reset, sendResponseChan chan *interop.InvokeResponseMetrics,
request *interop.CancellableRequest, runtimeCalledResponse bool) (err error) {
/* In case of /response, we copy the payload and, once copied, we attach:
* 1) 'Lambda-Runtime-Function-Error-Type'
* 2) 'Lambda-Runtime-Function-Error-Body'
* trailers. */
copyDone, cancel, err := asyncPayloadCopy(w, payload)
if err != nil {
renderInternalServerError(w, err.Error())
return err
}
var errorTypeTrailer string
var errorBodyTrailer string
var copyDoneResult CopyDoneResult
select {
case copyDoneResult = <-copyDone: // copy finished
errorTypeTrailer = trailers.Get(FunctionErrorTypeTrailer)
errorBodyTrailer = trailers.Get(FunctionErrorBodyTrailer)
if copyDoneResult.Error != nil && errorTypeTrailer == "" {
errorTypeTrailer = string(mapCopyDoneResultErrorToErrorType(copyDoneResult.Error))
}
case reset := <-interruptedResponseChan: // reset initiated
cancel()
if request != nil {
// In case of reset:
// * to interrupt copying when runtime called /response (a potential stuck on Body.Read() operation),
// we close the underlying connection using .Close() method on the request object
// * for /error case, the whole body is already read in /error handler, so we don't need additional handling
// when sending streaming invoke error response
connErr := request.Cancel()
if connErr != nil {
log.Warnf("Failed to close underlying connection: %s", connErr)
}
} else {
log.Warn("Cannot close underlying connection. Request object is nil")
}
copyDoneResult = <-copyDone
reset.InvokeResponseMetrics = copyDoneResult.Metrics
reset.InvokeResponseMode = InvokeResponseMode
interruptedResponseChan <- nil
errorTypeTrailer = string(getErrorTypeFromResetReason(reset.Reason))
}
w.Header().Set(FunctionErrorTypeTrailer, errorTypeTrailer)
w.Header().Set(FunctionErrorBodyTrailer, errorBodyTrailer)
copyDoneResult.Metrics.RuntimeCalledResponse = runtimeCalledResponse
sendResponseChan <- copyDoneResult.Metrics
if copyDoneResult.Error != nil {
log.Errorf("Error while streaming response payload: %s", copyDoneResult.Error)
err = copyDoneResult.Error
}
return
}