func()

in replica/replicator.go [193:256]


func (r *Replicator) readWrite(rw int, keys []string, request []byte, operation string,
	fopts *forward.Options, opts *Options) ([]Response, error) {

	var rwValue int
	switch rw {
	case read:
		rwValue = opts.RValue
	case write:
		rwValue = opts.WValue
	}

	if rwValue > opts.NValue {
		return nil, errors.New("rw value cannot exceed n value")
	}

	destsByKey, keysByDest := r.groupReplicas(keys, opts.NValue)
	var dests []string
	switch len(keys) {
	case 1:
		// preserve preference list order
		dests = destsByKey[keys[0]]
	default:
		// else arbitary order
		for dest := range keysByDest {
			dests = append(dests, dest)
		}
	}

	if len(dests) < rwValue {
		return nil, errors.New("rw value not satisfied by destination")
	}

	var responses []Response
	var errs []error

	copts := &callOptions{
		Keys:       keys,
		Dests:      dests,
		Request:    request,
		KeysByDest: keysByDest,
		Operation:  operation,
	}

	switch opts.FanoutMode {
	case Parallel:
		responses, errs = r.parallel(rwValue, copts, fopts, opts)
	case SerialSequential, SerialBalanced:
		responses, errs = r.serial(rwValue, copts, fopts, opts)
	}

	if len(responses) < rwValue {
		r.logger.WithFields(log.Fields{
			"nValue":       opts.NValue,
			"rwValue":      rwValue,
			"numResponses": len(responses),
			"numErrors":    len(errs),
			"errors":       errs,
		}).Debug("replicator rw value not satisfied")

		return responses, errors.New("rw value not satisfied")
	}

	return responses, nil
}