forward/forwarder.go (127 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // Package forward provides a mechanism to forward TChannel requests. package forward import ( "encoding/json" "sync" "time" log "github.com/uber-common/bark" "github.com/uber/ringpop-go/events" "github.com/uber/ringpop-go/logging" "github.com/uber/ringpop-go/shared" "github.com/uber/ringpop-go/util" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" ) // A Sender is used to route the request to the proper destination, // the server returned by Lookup(key) type Sender interface { // WhoAmI should return the address of the local sender WhoAmI() (string, error) // Lookup should return the server the request belongs to Lookup(string) (string, error) } // Options for the creation of a forwarder type Options struct { Ctx thrift.Context MaxRetries int RerouteRetries bool RetrySchedule []time.Duration Timeout time.Duration Headers []byte } func (f *Forwarder) defaultOptions() *Options { return &Options{ MaxRetries: 3, RetrySchedule: []time.Duration{3 * time.Second, 6 * time.Second, 12 * time.Second}, Timeout: 3 * time.Second, } } func (f *Forwarder) mergeDefaultOptions(opts *Options) *Options { def := f.defaultOptions() if opts == nil { return def } var merged Options merged.MaxRetries = util.SelectInt(opts.MaxRetries, def.MaxRetries) merged.Timeout = util.SelectDuration(opts.Timeout, def.Timeout) merged.RerouteRetries = opts.RerouteRetries merged.RetrySchedule = opts.RetrySchedule if opts.RetrySchedule == nil { merged.RetrySchedule = def.RetrySchedule } merged.Headers = opts.Headers merged.Ctx = opts.Ctx return &merged } // A Forwarder is used to forward requests to their destinations type Forwarder struct { events.AsyncEventEmitter sender Sender channel shared.SubChannel logger log.Logger inflightLock sync.Mutex inflight int64 } // NewForwarder returns a new forwarder func NewForwarder(s Sender, ch shared.SubChannel) *Forwarder { logger := logging.Logger("forwarder") if address, err := s.WhoAmI(); err == nil { logger = logger.WithField("local", address) } return &Forwarder{ sender: s, channel: ch, logger: logger, } } func (f *Forwarder) incrementInflight() { f.inflightLock.Lock() f.inflight++ inflight := f.inflight f.inflightLock.Unlock() f.EmitEvent(InflightRequestsChangedEvent{inflight}) } func (f *Forwarder) decrementInflight() { f.inflightLock.Lock() pre := f.inflight f.inflight-- // make sure that we do not decrement below 0 if f.inflight < 0 { f.inflight = 0 f.EmitEvent(InflightRequestsMiscountEvent{InflightDecrement}) } inflight := f.inflight f.inflightLock.Unlock() if pre != inflight { f.EmitEvent(InflightRequestsChangedEvent{inflight}) } } // ForwardRequest forwards a request to the given service and endpoint returns the response. // Keys are used by the sender to lookup the destination on retry. If you have multiple keys // and their destinations diverge on a retry then the call is aborted. func (f *Forwarder) ForwardRequest(request []byte, destination, service, endpoint string, keys []string, format tchannel.Format, opts *Options) ([]byte, error) { f.EmitEvent(RequestForwardedEvent{}) f.incrementInflight() opts = f.mergeDefaultOptions(opts) rs := newRequestSender(f.sender, f, f.channel, request, keys, destination, service, endpoint, format, opts) b, err := rs.Send() f.decrementInflight() if err != nil { f.EmitEvent(FailedEvent{}) } else { f.EmitEvent(SuccessEvent{}) } return b, err } var ( // ForwardedHeaderName is the name used by the ringpop adapter to indicate // it is a forwarded request. ForwardedHeaderName = "ringpop-forward-keys" ) // SetForwardedHeader adds a header to the current thrift context indicating // that the call has been forwarded by another node in the ringpop ring. This // header is used when a remote call is received to determine if forwarding // checks needs to be applied. By not forwarding already forwarded calls we // prevent unbound forwarding in the ring in case of memebership disagreement. // The keys provided will be serialized as the value of the key and can be used // in the future to check if key inconsistencies are found while forwarding. // Currently this is not checked func SetForwardedHeader(ctx thrift.Context, keys []string) thrift.Context { // copy headers to make sure two calls do not leak headers to each other headers := make(map[string]string, len(ctx.Headers())+1) for key, value := range ctx.Headers() { headers[key] = value } if keys == nil { // prevent null serialization, but use an empty array instead keys = []string{} } keysBytes, _ := json.Marshal(keys) keysString := string(keysBytes) // set the header indicating the call is forwarded for the provided keys headers[ForwardedHeaderName] = keysString // return the ctx with new headrs return thrift.WithHeaders(ctx, headers) } // DeleteForwardedHeader takes the headers that came in via TChannel and looks // for the precense of a specific ringpop header to see if ringpop already // forwarded the message. If the header is present it will delete the header // from the context. The return value indicates if the header was present and // deleted func DeleteForwardedHeader(ctx thrift.Context) bool { _, ok := ctx.Headers()[ForwardedHeaderName] if ok { delete(ctx.Headers(), ForwardedHeaderName) } return ok }