in replica/replicator.go [193:256]
func (r *Replicator) readWrite(rw int, keys []string, request []byte, operation string,
fopts *forward.Options, opts *Options) ([]Response, error) {
var rwValue int
switch rw {
case read:
rwValue = opts.RValue
case write:
rwValue = opts.WValue
}
if rwValue > opts.NValue {
return nil, errors.New("rw value cannot exceed n value")
}
destsByKey, keysByDest := r.groupReplicas(keys, opts.NValue)
var dests []string
switch len(keys) {
case 1:
// preserve preference list order
dests = destsByKey[keys[0]]
default:
// else arbitary order
for dest := range keysByDest {
dests = append(dests, dest)
}
}
if len(dests) < rwValue {
return nil, errors.New("rw value not satisfied by destination")
}
var responses []Response
var errs []error
copts := &callOptions{
Keys: keys,
Dests: dests,
Request: request,
KeysByDest: keysByDest,
Operation: operation,
}
switch opts.FanoutMode {
case Parallel:
responses, errs = r.parallel(rwValue, copts, fopts, opts)
case SerialSequential, SerialBalanced:
responses, errs = r.serial(rwValue, copts, fopts, opts)
}
if len(responses) < rwValue {
r.logger.WithFields(log.Fields{
"nValue": opts.NValue,
"rwValue": rwValue,
"numResponses": len(responses),
"numErrors": len(errs),
"errors": errs,
}).Debug("replicator rw value not satisfied")
return responses, errors.New("rw value not satisfied")
}
return responses, nil
}