runtime/tchannel_outbound_call.go (195 lines of code) (raw):

// Copyright (c) 2023 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package zanzibar import ( "context" "fmt" "time" "github.com/pkg/errors" "github.com/uber/tchannel-go" "go.uber.org/thriftrw/protocol/binary" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type tchannelOutboundCall struct { client *TChannelClient call *tchannel.OutboundCall methodName string serviceMethod string success bool startTime time.Time finishTime time.Time duration time.Duration reqHeaders map[string]string resHeaders map[string]string contextLogger ContextLogger metrics ContextMetrics } func (c *tchannelOutboundCall) start() { c.startTime = time.Now() } func (c *tchannelOutboundCall) finish(ctx context.Context, err error) { c.finishTime = time.Now() // emit metrics if err != nil { errCause := tchannel.GetSystemErrorCode(errors.Cause(err)) scopeTags := map[string]string{scopeTagError: errCause.MetricsKey()} ctx = WithScopeTagsDefault(ctx, scopeTags, c.metrics.Scope()) c.metrics.IncCounter(ctx, clientSystemErrors, 1) } else if !c.success { c.metrics.IncCounter(ctx, clientAppErrors, 1) } else { c.metrics.IncCounter(ctx, clientSuccess, 1) } delta := c.finishTime.Sub(c.startTime) c.metrics.RecordTimer(ctx, clientLatency, delta) c.metrics.RecordHistogramDuration(ctx, clientLatencyHist, delta) c.duration = delta // write logs AppendLogFieldsToContext(ctx, c.logFields()...) if err == nil { c.contextLogger.DebugZ(ctx, "Finished an outgoing client TChannel request") } else { c.contextLogger.WarnZ(ctx, "Failed to send outgoing client TChannel request") } } func (c *tchannelOutboundCall) logFields() []zapcore.Field { var hostPort string if c.call != nil { hostPort = c.call.RemotePeer().HostPort } else { hostPort = "unknown" } fields := []zapcore.Field{ zap.String(logFieldClientRemoteAddr, hostPort), } headers := map[string]string{} for k, v := range c.reqHeaders { s := fmt.Sprintf("%s-%s", logFieldClientRequestHeaderPrefix, k) headers[s] = v } for k, v := range c.resHeaders { s := fmt.Sprintf("%s-%s", logFieldClientResponseHeaderPrefix, k) headers[s] = v } for k, v := range headers { fields = append(fields, zap.String(k, v)) } return fields } // writeReqHeaders writes request headers to arg2 func (c *tchannelOutboundCall) writeReqHeaders(reqHeaders map[string]string) error { c.reqHeaders = reqHeaders twriter, err := c.call.Arg2Writer() if err != nil { return errors.Wrapf( err, "Could not create arg2writer for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := WriteHeaders(twriter, reqHeaders); err != nil { _ = twriter.Close() return errors.Wrapf( err, "Could not write headers for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := twriter.Close(); err != nil { return errors.Wrapf( err, "Could not close arg2writer for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } return nil } // writeReqBody writes request body to arg3 func (c *tchannelOutboundCall) writeReqBody(ctx context.Context, req RWTStruct) error { structWireValue, err := req.ToWire() if err != nil { return errors.Wrapf( err, "Could not write request for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } twriter, err := c.call.Arg3Writer() if err != nil { return errors.Wrapf( err, "Could not create arg3writer for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := binary.Default.Encode(structWireValue, twriter); err != nil { _ = twriter.Close() return errors.Wrapf( err, "Could not write request for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := twriter.Close(); err != nil { return errors.Wrapf( err, "Could not close arg3writer for outbound %s.%s (%s %s) request", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } // request sent when arg3writer is closed c.metrics.IncCounter(ctx, clientRequest, 1) return nil } // readResHeaders read response headers from arg2 func (c *tchannelOutboundCall) readResHeaders(response *tchannel.OutboundCallResponse) error { treader, err := response.Arg2Reader() if err != nil { // Do not wrap system errors. if _, ok := err.(tchannel.SystemError); ok { return err } return errors.Wrapf( err, "Could not create arg2reader for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if c.resHeaders, err = ReadHeaders(treader); err != nil { _ = treader.Close() return errors.Wrapf( err, "Could not read headers for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := EnsureEmpty(treader, "reading response headers"); err != nil { _ = treader.Close() return errors.Wrapf( err, "Could not ensure arg2reader is empty for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := treader.Close(); err != nil { return errors.Wrapf( err, "Could not close arg2reader for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } // must have called Arg2Reader before calling ApplicationError c.success = !response.ApplicationError() return nil } // readResBody read response body from arg3 func (c *tchannelOutboundCall) readResBody(ctx context.Context, response *tchannel.OutboundCallResponse, resp RWTStruct) error { treader, err := response.Arg3Reader() if err != nil { return errors.Wrapf( err, "Could not create arg3Reader for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := ReadStruct(treader, resp); err != nil { _ = treader.Close() c.metrics.IncCounter(ctx, clientTchannelUnmarshalError, 1) return errors.Wrapf( err, "Could not read outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := EnsureEmpty(treader, "reading response body"); err != nil { _ = treader.Close() return errors.Wrapf( err, "Could not ensure arg3reader is empty for outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } if err := treader.Close(); err != nil { return errors.Wrapf( err, "Could not close arg3reader outbound %s.%s (%s %s) response", c.client.ClientID, c.methodName, c.client.serviceName, c.serviceMethod, ) } return nil }