grpcgcp/gcp_picker.go (200 lines of code) (raw):

/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package grpcgcp import ( "context" "fmt" "reflect" "strings" "sync" "time" "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/status" ) // Deadline exceeded gRPC error caused by client-side context reached deadline. var deErr = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker { gp := &gcpPicker{ gb: gb, scRefs: readySCRefs, } gp.log = NewGCPLogger(gb.log, fmt.Sprintf("[gcpPicker %p]", gp)) return gp } type gcpPicker struct { gb *gcpBalancer mu sync.Mutex scRefs []*subConnRef log grpclog.LoggerV2 } func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { if len(p.scRefs) <= 0 { if p.log.V(FINEST) { p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.") } return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } ctx := info.Ctx gcpCtx, hasGCPCtx := ctx.Value(gcpKey).(*gcpContext) boundKey := "" locator := "" var cmd grpc_gcp.AffinityConfig_Command if mcfg, ok := p.gb.methodCfg[info.FullMethodName]; ok { locator = mcfg.GetAffinityKey() cmd = mcfg.GetCommand() if hasGCPCtx && (cmd == grpc_gcp.AffinityConfig_BOUND || cmd == grpc_gcp.AffinityConfig_UNBIND) { a, err := getAffinityKeysFromMessage(locator, gcpCtx.reqMsg) if err != nil { return balancer.PickResult{}, fmt.Errorf( "failed to retrieve affinity key from request message: %v", err) } boundKey = a[0] } } scRef, err := p.getAndIncrementSubConnRef(info.Ctx, boundKey, cmd) if err != nil { return balancer.PickResult{}, err } if scRef == nil { if p.log.V(FINEST) { p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.") } return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } callStarted := time.Now() // define callback for post process once call is done callback := func(info balancer.DoneInfo) { scRef.streamsDecr() p.detectUnresponsive(ctx, scRef, callStarted, info.Err) if info.Err != nil { return } switch cmd { case grpc_gcp.AffinityConfig_BIND: bindKeys, err := getAffinityKeysFromMessage(locator, gcpCtx.replyMsg) if err == nil { for _, bk := range bindKeys { p.gb.bindSubConn(bk, scRef.subConn) } } case grpc_gcp.AffinityConfig_UNBIND: p.gb.unbindSubConn(boundKey) } } if p.log.V(FINEST) { p.log.Infof("picked SubConn: %p", scRef.subConn) } return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil } // unresponsiveWindow returns channel pool's unresponsiveDetectionMs multiplied // by 2^(refresh count since last response) as a time.Duration. This provides // exponential backoff when RPCs keep deadline exceeded after consecutive reconnections. func (p *gcpPicker) unresponsiveWindow(scRef *subConnRef) time.Duration { factor := uint32(1 << scRef.refreshCnt) return time.Millisecond * time.Duration(factor*p.gb.cfg.GetChannelPool().GetUnresponsiveDetectionMs()) } func (p *gcpPicker) detectUnresponsive(ctx context.Context, scRef *subConnRef, callStarted time.Time, rpcErr error) { if !p.gb.unresponsiveDetection { return } // Treat as a response from the server if deadline exceeded was not caused by client side context reached deadline. if dl, ok := ctx.Deadline(); rpcErr == nil || status.Code(rpcErr) != codes.DeadlineExceeded || rpcErr.Error() != deErr.Error() || !ok || dl.After(time.Now()) { scRef.gotResp() return } if callStarted.Before(scRef.lastResp) { return } // Increment deadline exceeded calls and check if there were enough deadline // exceeded calls and enough time passed since last response to trigger refresh. if scRef.deCallsInc() >= p.gb.cfg.GetChannelPool().GetUnresponsiveCalls() && scRef.lastResp.Before(time.Now().Add(-p.unresponsiveWindow(scRef))) { p.gb.refresh(scRef) } } func (p *gcpPicker) getAndIncrementSubConnRef(ctx context.Context, boundKey string, cmd grpc_gcp.AffinityConfig_Command) (*subConnRef, error) { if cmd == grpc_gcp.AffinityConfig_BIND && p.gb.cfg.GetChannelPool().GetBindPickStrategy() == grpc_gcp.ChannelPoolConfig_ROUND_ROBIN { scRef := p.gb.getSubConnRoundRobin(ctx) if p.log.V(FINEST) { p.log.Infof("picking SubConn for round-robin bind: %p", scRef.subConn) } scRef.streamsIncr() return scRef, nil } p.mu.Lock() defer p.mu.Unlock() scRef, err := p.getSubConnRef(boundKey) if err != nil { return nil, err } if scRef != nil { scRef.streamsIncr() } return scRef, nil } // getSubConnRef returns the subConnRef object that contains the subconn // ready to be used by picker. // Must be called holding the picker mutex lock. func (p *gcpPicker) getSubConnRef(boundKey string) (*subConnRef, error) { if boundKey != "" { if ref, ok := p.gb.getReadySubConnRef(boundKey); ok { return ref, nil } } return p.getLeastBusySubConnRef() } // Must be called holding the picker mutex lock. func (p *gcpPicker) getLeastBusySubConnRef() (*subConnRef, error) { minScRef := p.scRefs[0] minStreamsCnt := minScRef.getStreamsCnt() for _, scRef := range p.scRefs { if scRef.getStreamsCnt() < minStreamsCnt { minStreamsCnt = scRef.getStreamsCnt() minScRef = scRef } } // If the least busy connection still has capacity, use it if minStreamsCnt < int32(p.gb.cfg.GetChannelPool().GetMaxConcurrentStreamsLowWatermark()) { return minScRef, nil } if p.gb.cfg.GetChannelPool().GetMaxSize() == 0 || p.gb.getConnectionPoolSize() < int(p.gb.cfg.GetChannelPool().GetMaxSize()) { // Ask balancer to create new subconn when all current subconns are busy and // the connection pool still has capacity (either unlimited or maxSize is not reached). p.gb.newSubConn() // Let this picker return ErrNoSubConnAvailable because it needs some time // for the subconn to be READY. return nil, balancer.ErrNoSubConnAvailable } // If no capacity for the pool size and every connection reachs the soft limit, // Then picks the least busy one anyway. return minScRef, nil } func keysFromMessage(val reflect.Value, path []string, start int) ([]string, error) { if val.Kind() == reflect.Pointer || val.Kind() == reflect.Interface { val = val.Elem() } if len(path) == start { if val.Kind() != reflect.String { return nil, fmt.Errorf("cannot get string value from %q which is %q", strings.Join(path, "."), val.Kind()) } return []string{val.String()}, nil } if val.Kind() != reflect.Struct { return nil, fmt.Errorf("path %q traversal error: cannot lookup field %q (index %d in the path) in a %q value", strings.Join(path, "."), path[start], start, val.Kind()) } valField := val.FieldByName(strings.Title(path[start])) if valField.Kind() != reflect.Slice { return keysFromMessage(valField, path, start+1) } keys := []string{} for i := 0; i < valField.Len(); i++ { kk, err := keysFromMessage(valField.Index(i), path, start+1) if err != nil { return keys, err } keys = append(keys, kk...) } return keys, nil } // getAffinityKeysFromMessage retrieves the affinity key(s) from proto message using // the key locator defined in the affinity config. func getAffinityKeysFromMessage( locator string, msg interface{}, ) (affinityKeys []string, err error) { names := strings.Split(locator, ".") if len(names) == 0 { return nil, fmt.Errorf("empty affinityKey locator") } return keysFromMessage(reflect.ValueOf(msg), names, 0) } // NewErrPicker returns a picker that always returns err on Pick(). func newErrPicker(err error) balancer.Picker { return &errPicker{err: err} } type errPicker struct { err error // Pick() always returns this err. } func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { return balancer.PickResult{}, p.err }