in grpcgcp/gcp_picker.go [55:119]
func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if len(p.scRefs) <= 0 {
if p.log.V(FINEST) {
p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.")
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
ctx := info.Ctx
gcpCtx, hasGCPCtx := ctx.Value(gcpKey).(*gcpContext)
boundKey := ""
locator := ""
var cmd grpc_gcp.AffinityConfig_Command
if mcfg, ok := p.gb.methodCfg[info.FullMethodName]; ok {
locator = mcfg.GetAffinityKey()
cmd = mcfg.GetCommand()
if hasGCPCtx && (cmd == grpc_gcp.AffinityConfig_BOUND || cmd == grpc_gcp.AffinityConfig_UNBIND) {
a, err := getAffinityKeysFromMessage(locator, gcpCtx.reqMsg)
if err != nil {
return balancer.PickResult{}, fmt.Errorf(
"failed to retrieve affinity key from request message: %v", err)
}
boundKey = a[0]
}
}
scRef, err := p.getAndIncrementSubConnRef(info.Ctx, boundKey, cmd)
if err != nil {
return balancer.PickResult{}, err
}
if scRef == nil {
if p.log.V(FINEST) {
p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.")
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
callStarted := time.Now()
// define callback for post process once call is done
callback := func(info balancer.DoneInfo) {
scRef.streamsDecr()
p.detectUnresponsive(ctx, scRef, callStarted, info.Err)
if info.Err != nil {
return
}
switch cmd {
case grpc_gcp.AffinityConfig_BIND:
bindKeys, err := getAffinityKeysFromMessage(locator, gcpCtx.replyMsg)
if err == nil {
for _, bk := range bindKeys {
p.gb.bindSubConn(bk, scRef.subConn)
}
}
case grpc_gcp.AffinityConfig_UNBIND:
p.gb.unbindSubConn(boundKey)
}
}
if p.log.V(FINEST) {
p.log.Infof("picked SubConn: %p", scRef.subConn)
}
return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil
}