runtime/server_http_response.go (253 lines of code) (raw):

// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package zanzibar import ( "context" "fmt" "net/http" "strconv" "strings" "time" "github.com/buger/jsonparser" "github.com/pkg/errors" "github.com/uber-go/tally" "github.com/uber/zanzibar/runtime/jsonwrapper" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) // ServerHTTPResponse struct manages server http response type ServerHTTPResponse struct { Request *ServerHTTPRequest StatusCode int responseWriter http.ResponseWriter flushed bool finished bool finishTime time.Time DownstreamFinishTime time.Duration ClientType string pendingBodyBytes []byte pendingBodyObj interface{} pendingStatusCode int contextLogger ContextLogger scope tally.Scope jsonWrapper jsonwrapper.JSONWrapper Err error } // NewServerHTTPResponse is helper function to alloc ServerHTTPResponse func NewServerHTTPResponse( w http.ResponseWriter, req *ServerHTTPRequest, ) *ServerHTTPResponse { return &ServerHTTPResponse{ Request: req, StatusCode: 200, responseWriter: w, contextLogger: req.contextLogger, scope: req.scope, jsonWrapper: req.jsonWrapper, } } // finish will handle final logic, like metrics func (res *ServerHTTPResponse) finish(ctx context.Context) { logFields := GetLogFieldsFromCtx(ctx) if !res.Request.started { /* coverage ignore next line */ res.contextLogger.Error(ctx, "Forgot to start server response", append(logFields, zap.String("path", res.Request.URL.Path))..., ) /* coverage ignore next line */ return } if res.finished { /* coverage ignore next line */ res.contextLogger.Error(ctx, "Finished a server response multiple times", append(logFields, zap.String("path", res.Request.URL.Path))..., ) /* coverage ignore next line */ return } res.finished = true res.finishTime = time.Now() _, known := knownStatusCodes[res.StatusCode] tagged := res.scope.Tagged(map[string]string{ scopeTagStatus: fmt.Sprintf("%d", res.StatusCode), // no need to put this tag on the context because this is the end of response life cycle scopeTagClientType: res.ClientType, }) delta := res.finishTime.Sub(res.Request.startTime) tagged.Timer(endpointLatency).Record(delta) tagged.Histogram(endpointLatencyHist, tally.DefaultBuckets).RecordDuration(delta) if res.DownstreamFinishTime != 0 { overhead := delta - res.DownstreamFinishTime overheadRatio := overhead.Seconds() / delta.Seconds() tagged.Timer(endpointOverheadLatency).Record(overhead) tagged.Histogram(endpointOverheadLatencyHist, tally.DefaultBuckets).RecordDuration(overhead) tagged.Gauge(endpointOverheadRatio).Update(overheadRatio) } if !known { res.contextLogger.Error(ctx, "Unknown status code", append(logFields, zap.Int("UnknownStatusCode", res.StatusCode))..., ) } else { tagged.Counter(endpointStatus).Inc(1) } logFn := res.contextLogger.Debug if !known || res.StatusCode >= 400 && res.StatusCode < 600 { tagged.Counter(endpointAppErrors).Inc(1) logFn = res.contextLogger.WarnZ } span := res.Request.GetSpan() if span != nil { span.Finish() } logFn(ctx, fmt.Sprintf("Finished an incoming server HTTP request with %d status code", res.StatusCode), append(logFields, serverHTTPLogFields(res.Request, res)...)..., ) } func serverHTTPLogFields(req *ServerHTTPRequest, res *ServerHTTPResponse) []zapcore.Field { fields := []zapcore.Field{ zap.Int(logFieldResponseStatusCode, res.StatusCode), } for k, v := range res.Headers() { if len(v) > 0 { fields = append(fields, zap.String( fmt.Sprintf("%s-%s", logFieldEndpointResponseHeaderPrefix, k), strings.Join(v, ", "), )) } } if res.Err != nil { fields = append(fields, zap.Error(res.Err)) cause := errors.Cause(res.Err) if cause != nil && cause != res.Err { fields = append(fields, zap.NamedError("errorCause", cause)) } } return fields } // SendErrorString helper to send an error string func (res *ServerHTTPResponse) SendErrorString( statusCode int, errMsg string, ) { res.WriteJSONBytes(statusCode, nil, []byte(`{"error":"`+errMsg+`"}`), ) } // SendError helper to send an server error message, propagates underlying cause to logs etc. func (res *ServerHTTPResponse) SendError( statusCode int, errMsg string, errCause error, ) { res.Err = errCause res.WriteJSONBytes(statusCode, nil, []byte(`{"error":"`+errMsg+`"}`), ) } // WriteBytes writes a byte[] slice that is valid Response func (res *ServerHTTPResponse) WriteBytes( statusCode int, headers Header, bytes []byte, ) { if headers != nil { for _, k := range headers.Keys() { v, ok := headers.Get(k) if ok { res.responseWriter.Header().Set(k, v) } } } res.pendingStatusCode = statusCode res.pendingBodyBytes = bytes } // WriteJSONBytes writes a byte[] slice that is valid json to Response func (res *ServerHTTPResponse) WriteJSONBytes( statusCode int, headers Header, bytes []byte, ) { if headers == nil { headers = ServerHTTPHeader{} } headers.Add("content-type", "application/json") res.WriteBytes(statusCode, headers, bytes) } // MarshalResponseJSON serializes a json serializable into bytes func (res *ServerHTTPResponse) MarshalResponseJSON(body interface{}) []byte { ctx := res.Request.Context() if body == nil { res.SendError(500, "Could not serialize json response", errors.New("No Body JSON")) res.contextLogger.Error(ctx, "Could not serialize nil pointer body") return nil } bytes, err := res.jsonWrapper.Marshal(body) if err != nil { res.SendError(500, "Could not serialize json response", err) res.contextLogger.Error(ctx, "Could not serialize json response", zap.Error(err)) return nil } return bytes } // SendResponse sets content-type if not present and fills Response func (res *ServerHTTPResponse) SendResponse(statusCode int, headers Header, body interface{}, bytes []byte) { contentTypePresent := false if headers != nil { for _, k := range headers.Keys() { v, ok := headers.Get(k) if ok { if k == "Content-Type" { contentTypePresent = true } res.responseWriter.Header().Set(k, v) } } } // Set the content-type to application/json if not already available if !contentTypePresent { res.responseWriter.Header(). Set("content-type", "application/json") } res.pendingStatusCode = statusCode res.pendingBodyBytes = bytes res.pendingBodyObj = body } // WriteJSON writes a json serializable struct to Response func (res *ServerHTTPResponse) WriteJSON( statusCode int, headers Header, body interface{}, ) { bytes := res.MarshalResponseJSON(body) if bytes == nil { return } res.SendResponse(statusCode, headers, body, bytes) } // PeekBody allows for inspecting a key path inside the body // that is not flushed yet. This is useful for response middlewares // that want to inspect the response body. func (res *ServerHTTPResponse) PeekBody( keys ...string, ) ([]byte, jsonparser.ValueType, error) { value, valueType, _, err := jsonparser.Get( res.pendingBodyBytes, keys..., ) if err != nil { return nil, -1, err } return value, valueType, nil } // Flush will write the body to the response. Before flush is called // the body is pending. A pending body allows a response middleware to // write a different body. func (res *ServerHTTPResponse) flush(ctx context.Context) { if res.flushed { /* coverage ignore next line */ res.contextLogger.Error(ctx, "Flushed a server response multiple times", zap.String("path", res.Request.URL.Path), ) /* coverage ignore next line */ return } res.flushed = true res.writeHeader(res.pendingStatusCode) if _, noContent := noContentStatusCodes[res.pendingStatusCode]; !noContent { res.writeBytes(res.pendingBodyBytes) } res.finish(ctx) } func (res *ServerHTTPResponse) writeHeader(statusCode int) { res.StatusCode = statusCode res.responseWriter.WriteHeader(statusCode) } // WriteBytes writes raw bytes to output func (res *ServerHTTPResponse) writeBytes(bytes []byte) { _, err := res.responseWriter.Write(bytes) if err != nil { /* coverage ignore next line */ res.contextLogger.Error(res.Request.Context(), "Could not write string to resp body", zap.Error(err), zap.String("bytesLength", strconv.Itoa(len(bytes))), ) } } // GetPendingResponse lets you read the pending body bytes, obj and status code // which isn't sent back yet. func (res *ServerHTTPResponse) GetPendingResponse() ([]byte, int) { return res.pendingBodyBytes, res.pendingStatusCode } // GetPendingResponseObject lets you read the pending body object // which isn't sent back yet. func (res *ServerHTTPResponse) GetPendingResponseObject() interface{} { return res.pendingBodyObj } // Headers returns the underlying http response's headers func (res *ServerHTTPResponse) Headers() http.Header { return res.responseWriter.Header() }