replica/replicator.go (218 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // Package replica extends Ringpop functionality by providing a mechanism to replicate // a request to multiple nodes in the ring. package replica import ( "errors" "sync" log "github.com/uber-common/bark" "github.com/uber/ringpop-go/forward" "github.com/uber/ringpop-go/logging" "github.com/uber/ringpop-go/shared" "github.com/uber/ringpop-go/util" "github.com/uber/tchannel-go" ) // FanoutMode defines how a replicator should fanout it's requests type FanoutMode int const ( // Parallel fanout mode for replicator read write requests. Sends out requests // in parallel. Parallel FanoutMode = iota // SerialSequential fanout mode for replicator read write requests. Sends out // requests one at a time going through the preference list sequentially SerialSequential // SerialBalanced fanout mode for replicator read write requests. Sends out // requests one at a time, going through the preference list in a random order SerialBalanced ) const ( read int = iota write ) // A Sender is used to lookup the destinations for requests given a key. type Sender interface { // Lookup should return a server address Lookup(string) (string, error) // LookupN should return n server addresses LookupN(string, int) ([]string, error) // WhoAmI should return the local address of the sender WhoAmI() (string, error) } // A Response is a response from a replicator read/write request. type Response struct { Destination string Keys []string Body []byte } // Options for sending a read/write replicator request type Options struct { NValue, RValue, WValue int FanoutMode FanoutMode } type callOptions struct { Keys []string Dests []string Request []byte KeysByDest map[string][]string Operation string Format tchannel.Format } // A Replicator is used to replicate a request across nodes such that they share // ownership of some data. type Replicator struct { sender Sender channel shared.SubChannel forwarder *forward.Forwarder logger log.Logger defaults *Options } func selectFanoutMode(mode FanoutMode) FanoutMode { switch mode { case Parallel, SerialSequential, SerialBalanced: return mode default: return Parallel } } func mergeDefaultOptions(opts *Options, def *Options) *Options { if opts == nil { return def } var merged Options merged.NValue = util.SelectInt(opts.NValue, def.NValue) merged.RValue = util.SelectInt(opts.RValue, def.RValue) merged.WValue = util.SelectInt(opts.WValue, def.WValue) merged.FanoutMode = selectFanoutMode(opts.FanoutMode) return &merged } // NewReplicator returns a new Replicator instance that makes calls with the given // SubChannel to the service defined by SubChannel.GetServiceName(). The given n/w/r // values will be used as defaults for the replicator when none are provided // Deprecation: logger is no longer used. func NewReplicator(s Sender, channel shared.SubChannel, logger log.Logger, opts *Options) *Replicator { f := forward.NewForwarder(s, channel) opts = mergeDefaultOptions(opts, &Options{3, 1, 3, Parallel}) logger = logging.Logger("replicator") if address, err := s.WhoAmI(); err == nil { logger = logger.WithField("local", address) } return &Replicator{s, channel, f, logger, opts} } // Read replicates a read request. It takes key(s) to be used for lookup of the requests // destination, a request to send, the operation to perform at the destination, options // for forwarding the request as well as options for ffanning out the request. It also // takes a response type, which is the type of struct that will be returned in each // responses.Body in response. Response type must be a concrete struct. The body field // will contain a pointer to that type of struct. func (r *Replicator) Read(keys []string, request []byte, operation string, fopts *forward.Options, opts *Options) (responses []Response, err error) { opts = mergeDefaultOptions(opts, r.defaults) return r.readWrite(read, keys, request, operation, fopts, opts) } // Write replicates a write request. It takes key(s) to be used for lookup of the requests // destination, a request to send, the operation to perform at the destination, options // for forwarding the request as well as options for ffanning out the request. It also // takes a response type, which is the type of struct that will be returned in each // responses.Body in response. Response type must be a concrete struct. The body field // will contain a pointer to that type of struct. func (r *Replicator) Write(keys []string, request []byte, operation string, fopts *forward.Options, opts *Options) (responses []Response, err error) { opts = mergeDefaultOptions(opts, r.defaults) return r.readWrite(write, keys, request, operation, fopts, opts) } func (r *Replicator) groupReplicas(keys []string, n int) (map[string][]string, map[string][]string) { destsByKey := make(map[string][]string) keysByDest := make(map[string][]string) for _, key := range keys { dests, _ := r.sender.LookupN(key, n) destsByKey[key] = dests if len(dests) == 0 { continue } for _, dest := range dests { keysByDest[dest] = append(keysByDest[dest], key) } } return destsByKey, keysByDest } func (r *Replicator) readWrite(rw int, keys []string, request []byte, operation string, fopts *forward.Options, opts *Options) ([]Response, error) { var rwValue int switch rw { case read: rwValue = opts.RValue case write: rwValue = opts.WValue } if rwValue > opts.NValue { return nil, errors.New("rw value cannot exceed n value") } destsByKey, keysByDest := r.groupReplicas(keys, opts.NValue) var dests []string switch len(keys) { case 1: // preserve preference list order dests = destsByKey[keys[0]] default: // else arbitary order for dest := range keysByDest { dests = append(dests, dest) } } if len(dests) < rwValue { return nil, errors.New("rw value not satisfied by destination") } var responses []Response var errs []error copts := &callOptions{ Keys: keys, Dests: dests, Request: request, KeysByDest: keysByDest, Operation: operation, } switch opts.FanoutMode { case Parallel: responses, errs = r.parallel(rwValue, copts, fopts, opts) case SerialSequential, SerialBalanced: responses, errs = r.serial(rwValue, copts, fopts, opts) } if len(responses) < rwValue { r.logger.WithFields(log.Fields{ "nValue": opts.NValue, "rwValue": rwValue, "numResponses": len(responses), "numErrors": len(errs), "errors": errs, }).Debug("replicator rw value not satisfied") return responses, errors.New("rw value not satisfied") } return responses, nil } // sends read/write requests in parallel func (r *Replicator) parallel(rwValue int, copts *callOptions, fopts *forward.Options, opts *Options) ([]Response, []error) { var responses struct { successes []Response errors []error sync.Mutex } var wg sync.WaitGroup for _, dest := range copts.Dests { wg.Add(1) go func(dest string) { res, err := r.forwardRequest(dest, copts, fopts) if err != nil { responses.Lock() responses.errors = append(responses.errors, err) responses.Unlock() wg.Done() return } responses.Lock() responses.successes = append(responses.successes, res) responses.Unlock() wg.Done() }(dest) } wg.Wait() return responses.successes, responses.errors } func (r *Replicator) serial(rwValue int, copts *callOptions, fopts *forward.Options, opts *Options) ([]Response, []error) { var responses []Response var errors []error if opts.FanoutMode == SerialBalanced { copts.Dests = util.ShuffleStrings(copts.Dests) } for _, dest := range copts.Dests { res, err := r.forwardRequest(dest, copts, fopts) if err != nil { errors = append(errors, err) continue } responses = append(responses, res) } return responses, errors } func (r *Replicator) forwardRequest(dest string, copts *callOptions, fopts *forward.Options) (Response, error) { var response Response var keys = copts.KeysByDest[dest] res, err := r.forwarder.ForwardRequest(copts.Request, dest, r.channel.ServiceName(), copts.Operation, keys, copts.Format, fopts) if err != nil { r.logger.WithFields(log.Fields{ "error": err, }).Warn("replicator read/write error") return response, err } response.Destination = dest response.Keys = keys response.Body = res return response, nil }