lambda/rapi/rendering/rendering.go (233 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package rendering import ( "bytes" "context" "encoding/json" "errors" "io" "net/http" "strconv" "sync" "time" "go.amzn.com/lambda/interop" "go.amzn.com/lambda/metering" "go.amzn.com/lambda/rapi/model" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) const ( // ErrorTypeInternalServerError error type for internal server error ErrorTypeInternalServerError = "InternalServerError" // ErrorTypeInvalidStateTransition error type for invalid state transition ErrorTypeInvalidStateTransition = "InvalidStateTransition" // ErrorTypeInvalidRequestID error type for invalid request ID error ErrorTypeInvalidRequestID = "InvalidRequestID" // ErrorTypeRequestEntityTooLarge error type for payload too large ErrorTypeRequestEntityTooLarge = "RequestEntityTooLarge" // ErrorTypeTruncatedHTTPRequest error type for truncated HTTP request ErrorTypeTruncatedHTTPRequest = "TruncatedHTTPRequest" ) // ErrRenderingServiceStateNotSet returned when state not set var ErrRenderingServiceStateNotSet = errors.New("EventRenderingService state not set") // RendererState is renderer object state. type RendererState interface { RenderAgentEvent(w http.ResponseWriter, r *http.Request) error RenderRuntimeEvent(w http.ResponseWriter, r *http.Request) error } // EventRenderingService is a state machine for rendering runtime and agent API responses. type EventRenderingService struct { mutex *sync.RWMutex currentState RendererState } // NewRenderingService returns new EventRenderingService. func NewRenderingService() *EventRenderingService { return &EventRenderingService{ mutex: &sync.RWMutex{}, } } // SetRenderer set current state func (s *EventRenderingService) SetRenderer(state RendererState) { s.mutex.Lock() defer s.mutex.Unlock() s.currentState = state } // RenderAgentEvent delegates to state implementation. func (s *EventRenderingService) RenderAgentEvent(w http.ResponseWriter, r *http.Request) error { s.mutex.RLock() defer s.mutex.RUnlock() if s.currentState == nil { return ErrRenderingServiceStateNotSet } return s.currentState.RenderAgentEvent(w, r) } // RenderRuntimeEvent delegates to state implementation. func (s *EventRenderingService) RenderRuntimeEvent(w http.ResponseWriter, r *http.Request) error { s.mutex.RLock() defer s.mutex.RUnlock() if s.currentState == nil { return ErrRenderingServiceStateNotSet } return s.currentState.RenderRuntimeEvent(w, r) } type RestoreRenderer struct{} func NewRestoreRenderer() *RestoreRenderer { return &RestoreRenderer{} } func (s *RestoreRenderer) RenderRuntimeEvent(writer http.ResponseWriter, request *http.Request) error { writer.WriteHeader(http.StatusOK) return nil } func (s *RestoreRenderer) RenderAgentEvent(writer http.ResponseWriter, request *http.Request) error { return nil } // InvokeRendererMetrics contains metrics of invoke request type InvokeRendererMetrics struct { ReadTime time.Duration SizeBytes int } // InvokeRenderer knows how to render invoke event. type InvokeRenderer struct { ctx context.Context invoke *interop.Invoke tracingHeaderParser func(context.Context) string requestBuffer *bytes.Buffer requestMutex sync.Mutex metrics InvokeRendererMetrics } // NewInvokeRenderer returns new invoke event renderer func NewInvokeRenderer(ctx context.Context, invoke *interop.Invoke, requestBuffer *bytes.Buffer, traceParser func(context.Context) string) *InvokeRenderer { requestBuffer.Reset() // clear request buffer, since this can be reused across invokes return &InvokeRenderer{ invoke: invoke, ctx: ctx, tracingHeaderParser: traceParser, requestBuffer: requestBuffer, requestMutex: sync.Mutex{}, } } // newAgentInvokeEvent forms a new AgentInvokeEvent from INVOKE request func newAgentInvokeEvent(req *interop.Invoke) (*model.AgentInvokeEvent, error) { deadlineMono, err := strconv.ParseInt(req.DeadlineNs, 10, 64) if err != nil { return nil, err } deadline := metering.MonoToEpoch(deadlineMono) / int64(time.Millisecond) return &model.AgentInvokeEvent{ AgentEvent: &model.AgentEvent{ EventType: "INVOKE", DeadlineMs: deadline, }, RequestID: req.ID, InvokedFunctionArn: req.InvokedFunctionArn, Tracing: model.NewXRayTracing(req.TraceID), }, nil } // RenderAgentEvent renders invoke event json for agent. func (s *InvokeRenderer) RenderAgentEvent(writer http.ResponseWriter, request *http.Request) error { event, err := newAgentInvokeEvent(s.invoke) if err != nil { return err } bytes, err := json.Marshal(event) if err != nil { return err } eventID := uuid.New() headers := writer.Header() headers.Set("Lambda-Extension-Event-Identifier", eventID.String()) headers.Set("Content-Type", "application/json") writer.WriteHeader(http.StatusOK) if _, err := writer.Write(bytes); err != nil { return err } return nil } func (s *InvokeRenderer) bufferInvokeRequest() error { s.requestMutex.Lock() defer s.requestMutex.Unlock() var err error = nil if s.requestBuffer.Len() == 0 { reader := io.LimitReader(s.invoke.Payload, interop.MaxPayloadSize) start := time.Now() _, err = s.requestBuffer.ReadFrom(reader) s.metrics = InvokeRendererMetrics{ ReadTime: time.Since(start), SizeBytes: s.requestBuffer.Len(), } } return err } // RenderRuntimeEvent renders invoke payload for runtime. func (s *InvokeRenderer) RenderRuntimeEvent(writer http.ResponseWriter, request *http.Request) error { invoke := s.invoke customerTraceID := s.tracingHeaderParser(s.ctx) cognitoIdentityJSON := "" if len(invoke.CognitoIdentityID) != 0 || len(invoke.CognitoIdentityPoolID) != 0 { cognitoJSON, err := json.Marshal(model.CognitoIdentity{ CognitoIdentityID: invoke.CognitoIdentityID, CognitoIdentityPoolID: invoke.CognitoIdentityPoolID, }) if err != nil { return err } cognitoIdentityJSON = string(cognitoJSON) } var deadlineHeader string if t, err := strconv.ParseInt(invoke.DeadlineNs, 10, 64); err == nil { deadlineHeader = strconv.FormatInt(metering.MonoToEpoch(t)/int64(time.Millisecond), 10) } else { log.WithError(err).Warn("Failed to compute deadline header") } renderInvokeHeaders(writer, invoke.ID, customerTraceID, invoke.ClientContext, cognitoIdentityJSON, invoke.InvokedFunctionArn, deadlineHeader, invoke.ContentType) if invoke.Payload != nil { if err := s.bufferInvokeRequest(); err != nil { return err } _, err := writer.Write(s.requestBuffer.Bytes()) return err } return nil } func (s *InvokeRenderer) GetMetrics() InvokeRendererMetrics { s.requestMutex.Lock() defer s.requestMutex.Unlock() return s.metrics } // ShutdownRenderer renderer for shutdown event. type ShutdownRenderer struct { AgentEvent model.AgentShutdownEvent } // RenderAgentEvent renders shutdown event for agent. func (s *ShutdownRenderer) RenderAgentEvent(w http.ResponseWriter, r *http.Request) error { bytes, err := json.Marshal(s.AgentEvent) if err != nil { return err } if _, err := w.Write(bytes); err != nil { return err } return nil } // RenderRuntimeEvent renders shutdown event for runtime. func (s *ShutdownRenderer) RenderRuntimeEvent(w http.ResponseWriter, r *http.Request) error { panic("We should SIGTERM runtime") } func renderInvokeHeaders(writer http.ResponseWriter, invokeID string, customerTraceID string, clientContext string, cognitoIdentity string, invokedFunctionArn string, deadlineMs string, contentType string) { setHeaderIfNotEmpty := func(headers http.Header, key string, value string) { if value != "" { headers.Set(key, value) } } headers := writer.Header() setHeaderIfNotEmpty(headers, "Lambda-Runtime-Aws-Request-Id", invokeID) setHeaderIfNotEmpty(headers, "Lambda-Runtime-Trace-Id", customerTraceID) setHeaderIfNotEmpty(headers, "Lambda-Runtime-Client-Context", clientContext) setHeaderIfNotEmpty(headers, "Lambda-Runtime-Cognito-Identity", cognitoIdentity) setHeaderIfNotEmpty(headers, "Lambda-Runtime-Invoked-Function-Arn", invokedFunctionArn) setHeaderIfNotEmpty(headers, "Lambda-Runtime-Deadline-Ms", deadlineMs) if contentType == "" { contentType = "application/json" } headers.Set("Content-Type", contentType) writer.WriteHeader(http.StatusOK) } // RenderRuntimeLogsResponse renders response from Telemetry API func RenderRuntimeLogsResponse(w http.ResponseWriter, respBody []byte, status int, headers map[string][]string) error { respHeaders := w.Header() for k, vals := range headers { for _, v := range vals { respHeaders.Add(k, v) } } w.WriteHeader(status) _, err := w.Write(respBody) return err } // RenderAccepted method for rendering accepted status response func RenderAccepted(w http.ResponseWriter, r *http.Request) { if err := RenderJSON(http.StatusAccepted, w, r, &model.StatusResponse{ Status: "OK", }); err != nil { log.WithError(err).Warn("Error while rendering response") http.Error(w, err.Error(), http.StatusInternalServerError) } }