grpcgcp/gcp_balancer.go (433 lines of code) (raw):
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpcgcp
import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
)
var _ balancer.Balancer = (*gcpBalancer)(nil) // Ensure gcpBalancer implements Balancer
const (
// Name is the name of grpc_gcp balancer.
Name = "grpc_gcp"
healthCheckEnabled = true
defaultMinSize = 1
defaultMaxSize = 4
defaultMaxStreams = 100
)
func init() {
balancer.Register(newBuilder())
}
type gcpBalancerBuilder struct {
balancer.ConfigParser
}
type GCPBalancerConfig struct {
serviceconfig.LoadBalancingConfig
*pb.ApiConfig
}
func (bb *gcpBalancerBuilder) Build(
cc balancer.ClientConn,
opt balancer.BuildOptions,
) balancer.Balancer {
gb := &gcpBalancer{
cc: cc,
methodCfg: make(map[string]*pb.AffinityConfig),
affinityMap: make(map[string]balancer.SubConn),
fallbackMap: make(map[string]balancer.SubConn),
scRefs: make(map[balancer.SubConn]*subConnRef),
scStates: make(map[balancer.SubConn]connectivity.State),
refreshingScRefs: make(map[balancer.SubConn]*subConnRef),
scRefList: []*subConnRef{},
rrRefId: ^uint32(0),
csEvltr: &connectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: newErrPicker(balancer.ErrNoSubConnAvailable),
}
gb.log = NewGCPLogger(compLogger, fmt.Sprintf("[gcpBalancer %p]", gb))
return gb
}
func (*gcpBalancerBuilder) Name() string {
return Name
}
// ParseConfig converts raw json config into GCPBalancerConfig.
// This is called by ClientConn on any load balancer config update.
// After parsing the config, ClientConn calls UpdateClientConnState passing the config.
func (*gcpBalancerBuilder) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
c := &GCPBalancerConfig{
ApiConfig: &pb.ApiConfig{},
}
err := protojson.Unmarshal(j, c)
return c, err
}
// newBuilder creates a new grpcgcp balancer builder.
func newBuilder() balancer.Builder {
return &gcpBalancerBuilder{}
}
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
//
// recordTransition should only be called synchronously from the same goroutine.
func (cse *connectivityStateEvaluator) recordTransition(
oldState,
newState connectivity.State,
) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}
// subConnRef keeps reference to the real SubConn with its
// connectivity state, affinity count and streams count.
type subConnRef struct {
subConn balancer.SubConn
stateSignal chan struct{} // This channel is closed and re-created when subConn or its state changes.
affinityCnt int32 // Keeps track of the number of keys bound to the subConn.
streamsCnt int32 // Keeps track of the number of streams opened on the subConn.
lastResp time.Time // Timestamp of the last response from the server.
deCalls uint32 // Keeps track of deadline exceeded calls since last response.
refreshing bool // If this subconn is in the process of refreshing.
refreshCnt uint32 // Number of refreshes since last response.
}
func (ref *subConnRef) getAffinityCnt() int32 {
return atomic.LoadInt32(&ref.affinityCnt)
}
func (ref *subConnRef) getStreamsCnt() int32 {
return atomic.LoadInt32(&ref.streamsCnt)
}
func (ref *subConnRef) affinityIncr() {
atomic.AddInt32(&ref.affinityCnt, 1)
}
func (ref *subConnRef) affinityDecr() {
atomic.AddInt32(&ref.affinityCnt, -1)
}
func (ref *subConnRef) streamsIncr() {
atomic.AddInt32(&ref.streamsCnt, 1)
}
func (ref *subConnRef) streamsDecr() {
atomic.AddInt32(&ref.streamsCnt, -1)
}
func (ref *subConnRef) deCallsInc() uint32 {
return atomic.AddUint32(&ref.deCalls, 1)
}
func (ref *subConnRef) gotResp() {
ref.lastResp = time.Now()
atomic.StoreUint32(&ref.deCalls, 0)
ref.refreshCnt = 0
}
type gcpBalancer struct {
cfg *GCPBalancerConfig
methodCfg map[string]*pb.AffinityConfig
addrs []resolver.Address
cc balancer.ClientConn
csEvltr *connectivityStateEvaluator
state connectivity.State
mu sync.RWMutex
affinityMap map[string]balancer.SubConn
fallbackMap map[string]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
scRefs map[balancer.SubConn]*subConnRef
scRefList []*subConnRef
rrRefId uint32
// Map from a fresh SubConn to the subConnRef where we want to refresh subConn.
refreshingScRefs map[balancer.SubConn]*subConnRef
// Unresponsive detection enabled flag.
unresponsiveDetection bool
picker balancer.Picker
log grpclog.LoggerV2
}
func (gb *gcpBalancer) initializeConfig(cfg *GCPBalancerConfig) {
gb.cfg = &GCPBalancerConfig{
ApiConfig: &pb.ApiConfig{
ChannelPool: &pb.ChannelPoolConfig{},
},
}
if cfg != nil && cfg.ApiConfig != nil {
gb.cfg = &GCPBalancerConfig{
ApiConfig: proto.Clone(cfg.ApiConfig).(*pb.ApiConfig),
}
}
if gb.cfg.GetChannelPool() == nil {
gb.cfg.ChannelPool = &pb.ChannelPoolConfig{}
}
cp := gb.cfg.GetChannelPool()
if cp.GetMinSize() == 0 {
cp.MinSize = defaultMinSize
}
if cp.GetMaxSize() == 0 {
cp.MaxSize = defaultMaxSize
}
if cp.GetMaxConcurrentStreamsLowWatermark() == 0 {
cp.MaxConcurrentStreamsLowWatermark = defaultMaxStreams
}
mp := make(map[string]*pb.AffinityConfig)
methodCfgs := gb.cfg.GetMethod()
for _, methodCfg := range methodCfgs {
methodNames := methodCfg.GetName()
affinityCfg := methodCfg.GetAffinity()
if methodNames != nil && affinityCfg != nil {
for _, method := range methodNames {
mp[method] = affinityCfg
}
}
}
gb.methodCfg = mp
gb.unresponsiveDetection = cp.GetUnresponsiveCalls() > 0 && cp.GetUnresponsiveDetectionMs() > 0
gb.enforceMinSize()
}
func (gb *gcpBalancer) enforceMinSize() {
for len(gb.scRefs) < int(gb.cfg.GetChannelPool().GetMinSize()) {
gb.addSubConn()
}
}
func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
gb.mu.Lock()
defer gb.mu.Unlock()
addrs := ccs.ResolverState.Addresses
if gb.log.V(FINE) {
gb.log.Infoln("got new resolved addresses: ", addrs, " and balancer config: ", ccs.BalancerConfig)
}
gb.addrs = addrs
if gb.cfg == nil {
cfg, ok := ccs.BalancerConfig.(*GCPBalancerConfig)
if !ok && ccs.BalancerConfig != nil {
return fmt.Errorf("provided config is not GCPBalancerConfig: %v", ccs.BalancerConfig)
}
gb.initializeConfig(cfg)
}
if len(gb.scRefs) == 0 {
gb.newSubConn()
return nil
}
for _, scRef := range gb.scRefs {
// TODO(weiranf): update streams count when new addrs resolved?
scRef.subConn.UpdateAddresses(addrs)
scRef.subConn.Connect()
}
return nil
}
func (gb *gcpBalancer) ResolverError(err error) {
gb.log.Warningf("ResolverError: %v", err)
}
// check current connection pool size
func (gb *gcpBalancer) getConnectionPoolSize() int {
// TODO(golobokov): replace this with locked increase of subconns.
gb.mu.Lock()
defer gb.mu.Unlock()
return len(gb.scRefs)
}
// newSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef
// if none of the subconns are in the Connecting state.
func (gb *gcpBalancer) newSubConn() {
gb.mu.Lock()
defer gb.mu.Unlock()
// there are chances the newly created subconns are still connecting,
// we can wait on those new subconns.
for _, scState := range gb.scStates {
if scState == connectivity.Connecting || scState == connectivity.Idle {
return
}
}
gb.addSubConn()
}
// addSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef.
// Must be called holding the mutex lock.
func (gb *gcpBalancer) addSubConn() {
sc, err := gb.cc.NewSubConn(
gb.addrs,
balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
)
if err != nil {
gb.log.Errorf("failed to NewSubConn: %v", err)
return
}
gb.scRefs[sc] = &subConnRef{
subConn: sc,
stateSignal: make(chan struct{}),
lastResp: time.Now(),
}
gb.scStates[sc] = connectivity.Idle
gb.scRefList = append(gb.scRefList, gb.scRefs[sc])
sc.Connect()
}
// getReadySubConnRef returns a subConnRef and a bool. The bool indicates whether
// the boundKey exists in the affinityMap. If returned subConnRef is a nil, it
// means the underlying subconn is not READY yet.
func (gb *gcpBalancer) getReadySubConnRef(boundKey string) (*subConnRef, bool) {
gb.mu.Lock()
defer gb.mu.Unlock()
if sc, ok := gb.affinityMap[boundKey]; ok {
if gb.scStates[sc] != connectivity.Ready {
// It's possible that the bound subconn is not in the readySubConns list,
// If it's not ready, we throw ErrNoSubConnAvailable or
// fallback to a previously mapped ready subconn or the least busy.
if gb.cfg.GetChannelPool().GetFallbackToReady() {
if sc, ok := gb.fallbackMap[boundKey]; ok {
return gb.scRefs[sc], true
}
// Try to create fallback mapping.
if scRef, err := gb.picker.(*gcpPicker).getLeastBusySubConnRef(); err == nil {
gb.fallbackMap[boundKey] = scRef.subConn
return scRef, true
}
}
return nil, true
}
return gb.scRefs[sc], true
}
return nil, false
}
func (gb *gcpBalancer) getSubConnRoundRobin(ctx context.Context) *subConnRef {
if len(gb.scRefList) == 0 {
gb.newSubConn()
}
scRef := gb.scRefList[atomic.AddUint32(&gb.rrRefId, 1)%uint32(len(gb.scRefList))]
gb.mu.RLock()
if state := gb.scStates[scRef.subConn]; state == connectivity.Ready {
gb.mu.RUnlock()
return scRef
} else {
grpclog.Infof("grpcgcp.gcpBalancer: scRef is not ready: %v", state)
}
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
// Wait until SubConn is ready or call context is done.
for gb.scStates[scRef.subConn] != connectivity.Ready {
sigChan := scRef.stateSignal
gb.mu.RUnlock()
select {
case <-ctx.Done():
return scRef
case <-ticker.C:
case <-sigChan:
}
gb.mu.RLock()
}
gb.mu.RUnlock()
return scRef
}
// bindSubConn binds the given affinity key to an existing subConnRef.
func (gb *gcpBalancer) bindSubConn(bindKey string, sc balancer.SubConn) {
gb.mu.Lock()
defer gb.mu.Unlock()
_, ok := gb.affinityMap[bindKey]
if !ok {
gb.affinityMap[bindKey] = sc
}
gb.scRefs[sc].affinityIncr()
}
// unbindSubConn removes the existing binding associated with the key.
func (gb *gcpBalancer) unbindSubConn(boundKey string) {
gb.mu.Lock()
defer gb.mu.Unlock()
boundSC, ok := gb.affinityMap[boundKey]
if ok {
gb.scRefs[boundSC].affinityDecr()
delete(gb.affinityMap, boundKey)
}
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker is
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
// - built by the pickerBuilder with all READY SubConns otherwise.
func (gb *gcpBalancer) regeneratePicker() {
if gb.state == connectivity.TransientFailure {
gb.picker = newErrPicker(balancer.ErrTransientFailure)
return
}
readyRefs := []*subConnRef{}
// Select ready subConns from subConn map.
for sc, scState := range gb.scStates {
if scState == connectivity.Ready {
readyRefs = append(readyRefs, gb.scRefs[sc])
}
}
gb.picker = newGCPPicker(readyRefs, gb)
}
func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
gb.mu.Lock()
defer gb.mu.Unlock()
s := scs.ConnectivityState
if scRef, found := gb.refreshingScRefs[sc]; found {
if gb.log.V(FINE) {
gb.log.Infof("handle replacement SubConn state change: %p, %v", sc, s)
}
if s != connectivity.Ready {
// Ignore the replacement sc until it's ready.
return
}
// Replace SubConn of the scRef with the fresh SubConn (sc) concluding
// the refresh process initiated by refresh(*subConnRef).
oldSc := scRef.subConn
gb.scStates[sc] = gb.scStates[oldSc]
delete(gb.refreshingScRefs, sc)
delete(gb.scRefs, oldSc)
delete(gb.scStates, oldSc)
gb.scRefs[sc] = scRef
scRef.subConn = sc
scRef.deCalls = 0
scRef.lastResp = time.Now()
scRef.refreshing = false
scRef.refreshCnt++
gb.cc.RemoveSubConn(oldSc)
}
if gb.log.V(FINE) {
gb.log.Infof("handle SubConn state change: %p, %v", sc, s)
}
oldS, ok := gb.scStates[sc]
if !ok {
if gb.log.V(FINE) {
gb.log.Infof(
"got state changes for an unknown/replaced SubConn: %p, %v",
sc,
s,
)
}
return
}
gb.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
delete(gb.scRefs, sc)
delete(gb.scStates, sc)
}
if oldS == connectivity.Ready && s != oldS {
// Subconn is broken. Remove fallback mapping to this subconn.
for k, v := range gb.fallbackMap {
if v == sc {
delete(gb.fallbackMap, k)
}
}
}
if oldS != connectivity.Ready && s == connectivity.Ready {
// Remove fallback mapping for the keys of recovered subconn.
for k := range gb.fallbackMap {
if gb.affinityMap[k] == sc {
delete(gb.fallbackMap, k)
}
}
}
oldAggrState := gb.state
gb.state = gb.csEvltr.recordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(gb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
gb.regeneratePicker()
gb.cc.UpdateState(balancer.State{
ConnectivityState: gb.state,
Picker: gb.picker,
})
}
if scRef := gb.scRefs[sc]; scRef != nil {
// Inform of the state change.
close(scRef.stateSignal)
scRef.stateSignal = make(chan struct{})
}
}
// refresh initiates a new SubConn for a specific subConnRef and starts connecting.
// If the refresh is already initiated for the ref, then this is a no-op.
func (gb *gcpBalancer) refresh(ref *subConnRef) {
if ref.refreshing {
return
}
gb.mu.Lock()
defer gb.mu.Unlock()
if ref.refreshing {
return
}
ref.refreshing = true
sc, err := gb.cc.NewSubConn(
gb.addrs,
balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
)
if err != nil {
gb.log.Errorf("failed to create a replacement SubConn with NewSubConn: %v", err)
return
}
gb.refreshingScRefs[sc] = ref
sc.Connect()
}
func (gb *gcpBalancer) Close() {
}