in go/pkg/balancer/gcp_picker.go [30:89]
func (p *gcpPicker) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) {
if len(p.scRefs) <= 0 {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
p.mu.Lock()
defer p.mu.Unlock()
gcpCtx, hasGcpCtx := info.Ctx.Value(gcpKey).(*gcpContext)
boundKey := ""
if hasGcpCtx {
if p.poolCfg == nil {
// Initialize poolConfig for picker.
p.poolCfg = gcpCtx.poolCfg
}
affinity := gcpCtx.affinityCfg
if affinity != nil {
locator := affinity.GetAffinityKey()
cmd := affinity.GetCommand()
if cmd == pb.AffinityConfig_BOUND || cmd == pb.AffinityConfig_UNBIND {
a, err := getAffinityKeyFromMessage(locator, gcpCtx.reqMsg)
if err != nil {
return balancer.PickResult{}, fmt.Errorf(
"failed to retrieve affinity key from request message: %v", err)
}
boundKey = a
}
}
}
var scRef *subConnRef
scRef, err = p.getSubConnRef(boundKey)
if err != nil {
return balancer.PickResult{}, err
}
result.SubConn = scRef.subConn
scRef.streamsIncr()
// define callback for post process once call is done
result.Done = func(info balancer.DoneInfo) {
if info.Err == nil {
if hasGcpCtx {
affinity := gcpCtx.affinityCfg
locator := affinity.GetAffinityKey()
cmd := affinity.GetCommand()
if cmd == pb.AffinityConfig_BIND {
bindKey, err := getAffinityKeyFromMessage(locator, gcpCtx.replyMsg)
if err == nil {
p.gcpBalancer.bindSubConn(bindKey, scRef.subConn)
}
} else if cmd == pb.AffinityConfig_UNBIND {
p.gcpBalancer.unbindSubConn(boundKey)
}
}
}
scRef.streamsDecr()
}
return result, err
}