runtime/tchannel_client.go (233 lines of code) (raw):
// Copyright (c) 2023 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package zanzibar
import (
"context"
"strings"
"time"
"github.com/pkg/errors"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go"
"github.com/uber/zanzibar/runtime/ruleengine"
netContext "golang.org/x/net/context"
)
const (
logFieldClientID = "clientID"
// thrift service::method of client thrift spec
logFieldClientThriftMethod = "clientThriftMethod"
// the backend service corresponding to the client
logFieldClientService = "clientService"
// the method name for a particular client method call
logFieldClientMethod = "clientMethod"
)
// TChannelClientOption is used when creating a new TChannelClient
type TChannelClientOption struct {
ServiceName string
ClientID string
Timeout time.Duration
TimeoutPerAttempt time.Duration
RoutingKey *string
// MethodNames is a map from "ThriftService::method" to "ZanzibarMethodName",
// where ThriftService and method are from the service's Thrift IDL, and
// ZanzibarMethodName is the public method name exposed on the Zanzibar-generated
// client, from the zanzibar configuration. For example, if a client named FooClient
// has a methodMap of map[string]string{"Foo::bar":"Bar"}, then one can do
// `FooClient.Bar()` to issue a RPC to Thrift service `Foo`'s `bar` method.
MethodNames map[string]string
// Dynamically determine which alternate channel to call dynamically based on ruleEngine,
// else fallback to default routing
RuleEngine ruleengine.RuleEngine
// list of headers which would be looked for matching a request with ruleEngine
HeaderPatterns []string
// the header key that is used together with the request uuid on context to
// form a header when sending the request to downstream, e.g. "x-request-uuid"
RequestUUIDHeaderKey string
// AltChannelMap is a map for dynamic lookup of alternative channels
AltChannelMap map[string]*tchannel.SubChannel
// MaxAttempts is the maximum retry count for a client
MaxAttempts int
}
// TChannelClient implements TChannelCaller and makes outgoing Thrift calls.
type TChannelClient struct {
ClientID string
ContextLogger ContextLogger
ch *tchannel.Channel
sc *tchannel.SubChannel
scAlt *tchannel.SubChannel
serviceName string
methodNames map[string]string
timeout time.Duration
timeoutPerAttempt time.Duration
routingKey *string
shardKey *string
metrics ContextMetrics
contextExtractor ContextExtractor
requestUUIDHeaderKey string
ruleEngine ruleengine.RuleEngine
headerPatterns []string
altChannelMap map[string]*tchannel.SubChannel
maxAttempts int
}
// NewTChannelClient is deprecated, use NewTChannelClientContext instead
func NewTChannelClient(
ch *tchannel.Channel,
contextLogger ContextLogger,
scope tally.Scope,
contextExtractor ContextExtractor,
opt *TChannelClientOption,
) *TChannelClient {
return NewTChannelClientContext(
ch,
contextLogger,
NewContextMetrics(scope),
contextExtractor,
opt,
)
}
// NewTChannelClientContext returns a TChannelClient that makes calls over the given tchannel to the given thrift service.
func NewTChannelClientContext(
ch *tchannel.Channel,
contextLogger ContextLogger,
metrics ContextMetrics,
contextExtractor ContextExtractor,
opt *TChannelClientOption,
) *TChannelClient {
client := &TChannelClient{
ch: ch,
sc: ch.GetSubChannel(opt.ServiceName),
serviceName: opt.ServiceName,
ClientID: opt.ClientID,
methodNames: opt.MethodNames,
timeout: opt.Timeout,
timeoutPerAttempt: opt.TimeoutPerAttempt,
routingKey: opt.RoutingKey,
ContextLogger: contextLogger,
metrics: metrics,
contextExtractor: contextExtractor,
requestUUIDHeaderKey: opt.RequestUUIDHeaderKey,
ruleEngine: opt.RuleEngine,
headerPatterns: opt.HeaderPatterns,
altChannelMap: opt.AltChannelMap,
maxAttempts: opt.MaxAttempts,
}
return client
}
// Call makes a RPC call to the given service.
func (c *TChannelClient) Call(
ctx context.Context,
thriftService, methodName string,
reqHeaders map[string]string,
req, resp RWTStruct,
) (success bool, resHeaders map[string]string, err error) {
serviceMethod := thriftService + "::" + methodName
scopeTags := map[string]string{
scopeTagClient: c.ClientID,
scopeTagClientMethod: methodName,
scopeTagsTargetService: c.serviceName,
scopeTagsTargetEndpoint: serviceMethod,
}
ctx = WithScopeTagsDefault(ctx, scopeTags, c.metrics.Scope())
call := &tchannelOutboundCall{
client: c,
methodName: c.methodNames[serviceMethod],
serviceMethod: serviceMethod,
reqHeaders: reqHeaders,
contextLogger: c.ContextLogger,
metrics: c.metrics,
}
return c.call(ctx, call, reqHeaders, req, resp)
}
func (c *TChannelClient) call(
ctx context.Context,
call *tchannelOutboundCall,
reqHeaders map[string]string,
req, resp RWTStruct,
) (success bool, resHeaders map[string]string, err error) {
defer func() {
call.finish(ctx, err)
if call.resHeaders == nil {
call.resHeaders = make(map[string]string)
}
call.resHeaders[ClientResponseDurationKey] = call.duration.String()
}()
timeoutAndRetryOptions := GetTimeoutAndRetryOptions(ctx)
call.start()
reqUUID := RequestUUIDFromCtx(ctx)
if reqUUID != "" {
if reqHeaders == nil {
reqHeaders = make(map[string]string)
}
reqHeaders[c.requestUUIDHeaderKey] = reqUUID
}
// Start passing the MaxAttempt field which will be used while creating the RetryOptions.
// Note : No impact on the existing clients because MaxAttempt will be passed as 0 and it will default to 5 while retrying the execution.
// More details can be found at https://t3.uberinternal.com/browse/EDGE-8526
retryOpts := tchannel.RetryOptions{
TimeoutPerAttempt: c.timeoutPerAttempt,
MaxAttempts: c.maxAttempts,
}
//override timeout and retry config with endpoint level’s config
//when retryCount is 0, we assume endpoint level’s config is not provided
timeout := c.timeout
if timeoutAndRetryOptions != nil && timeoutAndRetryOptions.MaxAttempts != 0 {
retryOpts = tchannel.RetryOptions{
TimeoutPerAttempt: timeoutAndRetryOptions.RequestTimeoutPerAttemptInMs,
MaxAttempts: timeoutAndRetryOptions.MaxAttempts,
}
timeout = timeoutAndRetryOptions.OverallTimeoutInMs
}
ctxBuilder := tchannel.NewContextBuilder(timeout).
SetParentContext(ctx).
SetRetryOptions(&retryOpts)
if c.routingKey != nil {
ctxBuilder.SetRoutingKey(*c.routingKey)
}
rd := GetRoutingDelegateFromCtx(ctx)
if rd != "" {
ctxBuilder.SetRoutingDelegate(rd)
}
sk := GetShardKeyFromCtx(ctx)
if sk != "" {
ctxBuilder.SetShardKey(sk)
}
ctx, cancel := ctxBuilder.Build()
defer cancel()
err = c.ch.RunWithRetry(ctx, func(ctx netContext.Context, rs *tchannel.RequestState) (cerr error) {
call.resHeaders = map[string]string{}
call.success = false
sc, ctx := c.getDynamicChannelWithFallback(reqHeaders, c.sc, ctx)
call.call, cerr = sc.BeginCall(ctx, call.serviceMethod, &tchannel.CallOptions{
Format: tchannel.Thrift,
ShardKey: GetShardKeyFromCtx(ctx),
RequestState: rs,
RoutingDelegate: GetRoutingDelegateFromCtx(ctx),
})
if cerr != nil {
return errors.Wrapf(
cerr, "Could not begin outbound %s.%s (%s %s) request",
call.client.ClientID, call.methodName, call.client.serviceName, call.serviceMethod,
)
}
// trace request
reqHeaders = tchannel.InjectOutboundSpan(call.call.Response(), reqHeaders)
if cerr := call.writeReqHeaders(reqHeaders); cerr != nil {
return cerr
}
if cerr := call.writeReqBody(ctx, req); cerr != nil {
return cerr
}
response := call.call.Response()
if cerr = call.readResHeaders(response); cerr != nil {
return cerr
}
if cerr = call.readResBody(ctx, response, resp); cerr != nil {
return cerr
}
return cerr
})
if err != nil {
// Do not wrap system errors.
if _, ok := err.(tchannel.SystemError); ok {
return call.success, call.resHeaders, err
}
return call.success, nil, errors.Wrapf(
err, "Could not make outbound %s.%s (%s %s) response",
call.client.ClientID, call.methodName, call.client.serviceName, call.serviceMethod,
)
}
return call.success, call.resHeaders, err
}
// first rule match, would be the chosen channel. if nothing matches fallback to default channel
func (c *TChannelClient) getDynamicChannelWithFallback(reqHeaders map[string]string,
sc *tchannel.SubChannel, ctx netContext.Context) (*tchannel.SubChannel, netContext.Context) {
ch := sc
if c.ruleEngine == nil {
return ch, ctx
}
for _, headerPattern := range c.headerPatterns {
// this header is not present, so can't match a rule
headerPatternVal, ok := reqHeaders[headerPattern]
if !ok {
continue
}
val, match := c.ruleEngine.GetValue(headerPattern, strings.ToLower(headerPatternVal))
// if rule doesn't match, continue with a next input
if !match {
continue
}
serviceDetails := val.([]string)
// we know service has a channel, as this was constructed in c'tor
ch = c.altChannelMap[serviceDetails[0]]
if len(serviceDetails) > 1 {
ctx = WithRoutingDelegate(ctx, serviceDetails[1])
}
return ch, ctx
}
// if nothing matches return the default channel/**/
return ch, ctx
}