lambda/rapi/handler/invocationresponse.go (100 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package handler
import (
"net/http"
"go.amzn.com/lambda/appctx"
"go.amzn.com/lambda/core"
"go.amzn.com/lambda/fatalerror"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapi/rendering"
"github.com/go-chi/chi"
log "github.com/sirupsen/logrus"
)
const (
StreamingFunctionResponseMode = "streaming"
)
type invocationResponseHandler struct {
registrationService core.RegistrationService
}
func (h *invocationResponseHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
appCtx := appctx.FromRequest(request)
server := appctx.LoadResponseSender(appCtx)
if server == nil {
log.Panic("Invalid state, cannot access interop server")
}
runtime := h.registrationService.GetRuntime()
if err := runtime.InvocationResponse(); err != nil {
log.Warn(err)
rendering.RenderForbiddenWithTypeMsg(writer, request, rendering.ErrorTypeInvalidStateTransition, StateTransitionFailedForRuntimeMessageFormat,
runtime.GetState().Name(), core.RuntimeInvocationResponseStateName, err)
return
}
invokeID := chi.URLParam(request, "awsrequestid")
headers := map[string]string{contentTypeHeader: request.Header.Get(contentTypeHeader)}
if functionResponseMode := request.Header.Get(functionResponseModeHeader); functionResponseMode != "" {
switch functionResponseMode {
case StreamingFunctionResponseMode:
headers[functionResponseModeHeader] = functionResponseMode
default:
errHeaders := interop.InvokeResponseHeaders{
ContentType: request.Header.Get(contentTypeHeader),
}
fnError := interop.FunctionError{Type: fatalerror.RuntimeInvalidResponseModeHeader}
response := &interop.ErrorInvokeResponse{
Headers: errHeaders,
FunctionError: fnError,
Payload: []byte{},
}
_ = server.SendErrorResponse(chi.URLParam(request, "awsrequestid"), response)
rendering.RenderInvalidFunctionResponseMode(writer, request)
return
}
}
response := &interop.StreamableInvokeResponse{
Headers: headers,
Payload: request.Body,
Trailers: request.Trailer,
Request: &interop.CancellableRequest{Request: request},
}
if err := server.SendResponse(invokeID, response); err != nil {
switch err := err.(type) {
case *interop.ErrorResponseTooLarge:
if server.SendErrorResponse(invokeID, err.AsErrorResponse()) != nil {
rendering.RenderInteropError(writer, request, err)
return
}
appctx.StoreInvokeErrorTraceData(appCtx, &interop.InvokeErrorTraceData{})
if err := runtime.ResponseSent(); err != nil {
log.Panic(err)
}
rendering.RenderRequestEntityTooLarge(writer, request)
return
case *interop.ErrorResponseTooLargeDI:
// in DirectInvoke case, the (truncated) response is already sent back to the caller
if err := runtime.ResponseSent(); err != nil {
log.Panic(err)
}
rendering.RenderRequestEntityTooLarge(writer, request)
return
case *interop.ErrTruncatedResponse:
if err := runtime.ResponseSent(); err != nil {
log.Panic(err)
}
rendering.RenderTruncatedHTTPRequestError(writer, request)
return
case *interop.ErrInternalPlatformError:
rendering.RenderInternalServerError(writer, request)
return
default:
rendering.RenderInteropError(writer, request, err)
return
}
}
if err := runtime.ResponseSent(); err != nil {
log.Panic(err)
}
rendering.RenderAccepted(writer, request)
}
// NewInvocationResponseHandler returns a new instance of http handler
// for serving /runtime/invocation/{awsrequestid}/response.
func NewInvocationResponseHandler(registrationService core.RegistrationService) http.Handler {
return &invocationResponseHandler{
registrationService: registrationService,
}
}