func()

in go/pkg/balancer/gcp_picker.go [30:89]


func (p *gcpPicker) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) {
	if len(p.scRefs) <= 0 {
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	}

	p.mu.Lock()
	defer p.mu.Unlock()

	gcpCtx, hasGcpCtx := info.Ctx.Value(gcpKey).(*gcpContext)
	boundKey := ""

	if hasGcpCtx {
		if p.poolCfg == nil {
			// Initialize poolConfig for picker.
			p.poolCfg = gcpCtx.poolCfg
		}
		affinity := gcpCtx.affinityCfg
		if affinity != nil {
			locator := affinity.GetAffinityKey()
			cmd := affinity.GetCommand()
			if cmd == pb.AffinityConfig_BOUND || cmd == pb.AffinityConfig_UNBIND {
				a, err := getAffinityKeyFromMessage(locator, gcpCtx.reqMsg)
				if err != nil {
					return balancer.PickResult{}, fmt.Errorf(
						"failed to retrieve affinity key from request message: %v", err)
				}
				boundKey = a
			}
		}
	}

	var scRef *subConnRef
	scRef, err = p.getSubConnRef(boundKey)
	if err != nil {
		return balancer.PickResult{}, err
	}
	result.SubConn = scRef.subConn
	scRef.streamsIncr()

	// define callback for post process once call is done
	result.Done = func(info balancer.DoneInfo) {
		if info.Err == nil {
			if hasGcpCtx {
				affinity := gcpCtx.affinityCfg
				locator := affinity.GetAffinityKey()
				cmd := affinity.GetCommand()
				if cmd == pb.AffinityConfig_BIND {
					bindKey, err := getAffinityKeyFromMessage(locator, gcpCtx.replyMsg)
					if err == nil {
						p.gcpBalancer.bindSubConn(bindKey, scRef.subConn)
					}
				} else if cmd == pb.AffinityConfig_UNBIND {
					p.gcpBalancer.unbindSubConn(boundKey)
				}
			}
		}
		scRef.streamsDecr()
	}
	return result, err
}