func()

in grpcgcp/gcp_picker.go [55:119]


func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	if len(p.scRefs) <= 0 {
		if p.log.V(FINEST) {
			p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.")
		}
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	}

	ctx := info.Ctx
	gcpCtx, hasGCPCtx := ctx.Value(gcpKey).(*gcpContext)
	boundKey := ""
	locator := ""
	var cmd grpc_gcp.AffinityConfig_Command

	if mcfg, ok := p.gb.methodCfg[info.FullMethodName]; ok {
		locator = mcfg.GetAffinityKey()
		cmd = mcfg.GetCommand()
		if hasGCPCtx && (cmd == grpc_gcp.AffinityConfig_BOUND || cmd == grpc_gcp.AffinityConfig_UNBIND) {
			a, err := getAffinityKeysFromMessage(locator, gcpCtx.reqMsg)
			if err != nil {
				return balancer.PickResult{}, fmt.Errorf(
					"failed to retrieve affinity key from request message: %v", err)
			}
			boundKey = a[0]
		}
	}

	scRef, err := p.getAndIncrementSubConnRef(info.Ctx, boundKey, cmd)
	if err != nil {
		return balancer.PickResult{}, err
	}
	if scRef == nil {
		if p.log.V(FINEST) {
			p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.")
		}
		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
	}

	callStarted := time.Now()
	// define callback for post process once call is done
	callback := func(info balancer.DoneInfo) {
		scRef.streamsDecr()
		p.detectUnresponsive(ctx, scRef, callStarted, info.Err)
		if info.Err != nil {
			return
		}

		switch cmd {
		case grpc_gcp.AffinityConfig_BIND:
			bindKeys, err := getAffinityKeysFromMessage(locator, gcpCtx.replyMsg)
			if err == nil {
				for _, bk := range bindKeys {
					p.gb.bindSubConn(bk, scRef.subConn)
				}
			}
		case grpc_gcp.AffinityConfig_UNBIND:
			p.gb.unbindSubConn(boundKey)
		}
	}

	if p.log.V(FINEST) {
		p.log.Infof("picked SubConn: %p", scRef.subConn)
	}
	return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil
}