func sendStreamingInvokeResponse()

in lambda/core/directinvoke/directinvoke.go [286:341]


func sendStreamingInvokeResponse(payload io.Reader, trailers http.Header, w http.ResponseWriter,
	interruptedResponseChan chan *interop.Reset, sendResponseChan chan *interop.InvokeResponseMetrics,
	request *interop.CancellableRequest, runtimeCalledResponse bool) (err error) {
	/* In case of /response, we copy the payload and, once copied, we attach:
	 * 1) 'Lambda-Runtime-Function-Error-Type'
	 * 2) 'Lambda-Runtime-Function-Error-Body'
	 * trailers. */
	copyDone, cancel, err := asyncPayloadCopy(w, payload)
	if err != nil {
		renderInternalServerError(w, err.Error())
		return err
	}

	var errorTypeTrailer string
	var errorBodyTrailer string
	var copyDoneResult CopyDoneResult
	select {
	case copyDoneResult = <-copyDone: // copy finished
		errorTypeTrailer = trailers.Get(FunctionErrorTypeTrailer)
		errorBodyTrailer = trailers.Get(FunctionErrorBodyTrailer)
		if copyDoneResult.Error != nil && errorTypeTrailer == "" {
			errorTypeTrailer = string(mapCopyDoneResultErrorToErrorType(copyDoneResult.Error))
		}
	case reset := <-interruptedResponseChan: // reset initiated
		cancel()
		if request != nil {
			// In case of reset:
			// * to interrupt copying when runtime called /response (a potential stuck on Body.Read() operation),
			//   we close the underlying connection using .Close() method on the request object
			// * for /error case, the whole body is already read in /error handler, so we don't need additional handling
			//   when sending streaming invoke error response
			connErr := request.Cancel()
			if connErr != nil {
				log.Warnf("Failed to close underlying connection: %s", connErr)
			}
		} else {
			log.Warn("Cannot close underlying connection. Request object is nil")
		}
		copyDoneResult = <-copyDone
		reset.InvokeResponseMetrics = copyDoneResult.Metrics
		reset.InvokeResponseMode = InvokeResponseMode
		interruptedResponseChan <- nil
		errorTypeTrailer = string(getErrorTypeFromResetReason(reset.Reason))
	}
	w.Header().Set(FunctionErrorTypeTrailer, errorTypeTrailer)
	w.Header().Set(FunctionErrorBodyTrailer, errorBodyTrailer)

	copyDoneResult.Metrics.RuntimeCalledResponse = runtimeCalledResponse
	sendResponseChan <- copyDoneResult.Metrics

	if copyDoneResult.Error != nil {
		log.Errorf("Error while streaming response payload: %s", copyDoneResult.Error)
		err = copyDoneResult.Error
	}
	return
}