runtime/client_http_request.go (211 lines of code) (raw):

// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package zanzibar import ( "bytes" "context" "fmt" "io" "net/http" "strings" "time" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/uber/zanzibar/runtime/jsonwrapper" "go.uber.org/zap" ) var metricNormalizer = strings.NewReplacer("::", "--") // ClientHTTPRequest is the struct for making a single client request using an outbound http client. type ClientHTTPRequest struct { ClientID string ClientTargetEndpoint string MethodName string Metrics ContextMetrics client *HTTPClient httpReq *http.Request res *ClientHTTPResponse started bool startTime time.Time Logger *zap.Logger ContextLogger ContextLogger rawBody []byte defaultHeaders map[string]string ctx context.Context jsonWrapper jsonwrapper.JSONWrapper timeoutAndRetryOptions *TimeoutAndRetryOptions } // NewClientHTTPRequest allocates a ClientHTTPRequest. The ctx parameter is the context associated with the outbound requests. func NewClientHTTPRequest( ctx context.Context, clientID string, clientMethod string, clientTargetEndpoint string, client *HTTPClient, ) *ClientHTTPRequest { scopeTags := map[string]string{ scopeTagClientMethod: clientMethod, scopeTagClient: clientID, scopeTagsTargetEndpoint: metricNormalizer.Replace(clientTargetEndpoint), } ctx = WithScopeTagsDefault(ctx, scopeTags, client.contextMetrics.Scope()) req := &ClientHTTPRequest{ ClientID: clientID, MethodName: clientMethod, ClientTargetEndpoint: clientTargetEndpoint, Metrics: client.contextMetrics, client: client, ContextLogger: client.ContextLogger, defaultHeaders: client.DefaultHeaders, ctx: ctx, jsonWrapper: client.JSONWrapper, } req.res = NewClientHTTPResponse(req) req.timeoutAndRetryOptions = GetTimeoutAndRetryOptions(ctx) req.start() return req } // Start the request, do some metrics book keeping func (req *ClientHTTPRequest) start() { if req.started { /* coverage ignore next line */ req.ContextLogger.ErrorZ(req.ctx, "Cannot start ClientHTTPRequest twice") /* coverage ignore next line */ return } req.started = true req.startTime = time.Now() } // CheckHeaders verifies that the outbound request contains required headers func (req *ClientHTTPRequest) CheckHeaders(expected []string) error { if req.httpReq == nil { /* coverage ignore next line */ panic("must call `req.WriteJSON()` before `req.CheckHeaders()`") } actualHeaders := req.httpReq.Header for _, headerName := range expected { // headerName is case insensitive, http.Header Get canonicalize the key headerValue := actualHeaders.Get(headerName) if headerValue == "" { req.ContextLogger.WarnZ(req.ctx, "Got outbound request without mandatory header", zap.String("headerName", headerName), ) return errors.New("Missing mandatory header: " + headerName) } } return nil } // WriteJSON materialize the HTTP request with given method, url, headers and body. func (req *ClientHTTPRequest) WriteJSON( method, url string, headers map[string]string, body interface{}, ) error { var rawBody []byte if body != nil { var err error rawBody, err = req.jsonWrapper.Marshal(body) if err != nil { req.ContextLogger.ErrorZ(req.ctx, "Could not serialize request json", zap.Error(err)) return errors.Wrapf( err, "Could not serialize %s.%s request json", req.ClientID, req.MethodName, ) } } return req.WriteBytes(method, url, headers, rawBody) } // WriteBytes materialize the HTTP request with given method, url, headers and body. // Body is assumed to be a byte array.s func (req *ClientHTTPRequest) WriteBytes( method, url string, headers map[string]string, rawBody []byte, ) error { var httpReq *http.Request var httpErr error if rawBody != nil { req.rawBody = rawBody httpReq, httpErr = http.NewRequest(method, url, bytes.NewReader(rawBody)) } else { httpReq, httpErr = http.NewRequest(method, url, nil) } if httpErr != nil { req.ContextLogger.ErrorZ(req.ctx, "Could not create outbound request", zap.Error(httpErr)) return errors.Wrapf( httpErr, "Could not create outbound %s.%s request", req.ClientID, req.MethodName, ) } // Using `Add` over `Set` intentionally, allowing us to create a list // of headerValues for a given key. for headerKey, headerValue := range req.defaultHeaders { httpReq.Header.Set(headerKey, headerValue) } for k := range headers { httpReq.Header.Set(k, headers[k]) } req.httpReq = httpReq return nil } // Do will send the request out. func (req *ClientHTTPRequest) Do() (*ClientHTTPResponse, error) { opName := fmt.Sprintf("%s.%s(%s)", req.ClientID, req.MethodName, req.ClientTargetEndpoint) urlTag := opentracing.Tag{Key: "URL", Value: req.httpReq.URL} methodTag := opentracing.Tag{Key: "Method", Value: req.httpReq.Method} span, ctx := opentracing.StartSpanFromContext(req.ctx, opName, urlTag, methodTag) err := req.InjectSpanToHeader(span, opentracing.HTTPHeaders) if err != nil { /* coverage ignore next line */ req.ContextLogger.ErrorZ(req.ctx, "Fail to inject span to headers", zap.Error(err)) /* coverage ignore next line */ return nil, err } var retryCount int64 = 1 var res *http.Response // when timeoutAndRetryOptions per request is not configured, use default client level timeout if req.timeoutAndRetryOptions == nil || req.timeoutAndRetryOptions.MaxAttempts == 0 { res, err = req.client.Client.Do(req.httpReq.WithContext(ctx)) } else { res, retryCount, err = req.executeDoWithRetry(ctx) // new code for retry and timeout per ep level } span.Finish() if err != nil { req.ContextLogger.ErrorZ(req.ctx, fmt.Sprintf("Could not make http outbound %s.%s request", req.ClientID, req.MethodName), zap.Error(err)) return nil, errors.Wrapf(err, "errors while making outbound %s.%s request", req.ClientID, req.MethodName) } // emit metrics req.Metrics.IncCounter(req.ctx, clientRequest, retryCount) req.res.setRawHTTPResponse(res) return req.res, nil } // executeDoWithRetry will execute executeDo with retries func (req *ClientHTTPRequest) executeDoWithRetry(ctx context.Context) (*http.Response, int64, error) { var err error var res *http.Response var retryCount int64 = 0 for i := 0; i < req.timeoutAndRetryOptions.MaxAttempts; i++ { retryCount++ res, err = req.executeDo(ctx) if err == nil { return res, retryCount, nil } var shouldRetry = false // if attempts are pending, wait for backoff duration before next attempt if i+1 < req.timeoutAndRetryOptions.MaxAttempts { shouldRetry = req.client.CheckRetry(ctx, req.timeoutAndRetryOptions, res, err) } req.ContextLogger.Warn(ctx, "errors while making http outbound request", zap.Error(err), zap.String("clientId", req.ClientID), zap.String("methodName", req.MethodName), zap.Int64("attempt", retryCount), zap.Int("maxAttempts", req.timeoutAndRetryOptions.MaxAttempts), zap.Bool("shouldRetry", shouldRetry)) // TODO (future releases) - make retry conditional, inspect error/response and then retry // reassign body if req.rawBody != nil && len(req.rawBody) > 0 { req.httpReq.Body = io.NopCloser(bytes.NewBuffer(req.rawBody)) } // Break loop if no retries if !shouldRetry { break } } return nil, retryCount, err } // executeDo will send the request out with a timeout func (req *ClientHTTPRequest) executeDo(ctx context.Context) (*http.Response, error) { ctx, cancel := context.WithTimeout(ctx, req.timeoutAndRetryOptions.RequestTimeoutPerAttemptInMs) defer cancel() res, err := req.client.Client.Do(req.httpReq.WithContext(ctx)) // when no error, read body and capture before closing the connection if err == nil { req.res.setRawHTTPResponse(res) _, err = req.res.ReadAll() if err != nil { return nil, err } } return res, err } // InjectSpanToHeader will inject span to request header // This method is current used for unit tests // TODO: we need to set source and test code as same pkg name which would makes UTs easier func (req *ClientHTTPRequest) InjectSpanToHeader(span opentracing.Span, format interface{}) error { carrier := opentracing.HTTPHeadersCarrier(req.httpReq.Header) if err := span.Tracer().Inject(span.Context(), format, carrier); err != nil { req.ContextLogger.ErrorZ(req.ctx, "Failed to inject tracing span.", zap.Error(err)) return err } return nil }