in xds/balancer/clusterimpl/picker.go [116:203]
func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
// Don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.
if d.s.ConnectivityState == connectivity.Ready {
// Check if this RPC should be dropped by category.
for _, dp := range d.drops {
if dp.drop() {
if d.loadStore != nil {
d.loadStore.CallDropped(dp.category)
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
}
}
// Check if this RPC should be dropped by circuit breaking.
if d.counter != nil {
if err := d.counter.StartRequest(d.countMax); err != nil {
// Drops by circuit breaking are reported with empty category. They
// will be reported only in total drops, but not in per category.
if d.loadStore != nil {
d.loadStore.CallDropped("")
}
return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
}
}
var lIDStr string
pr, err := d.s.Picker.Pick(info)
if scw, ok := pr.SubConn.(*scWrapper); ok {
// This OK check also covers the case err!=nil, because SubConn will be
// nil.
pr.SubConn = scw.SubConn
var e error
// If locality ID isn't found in the wrapper, an empty locality ID will
// be used.
lIDStr, e = scw.localityID().ToString()
if e != nil {
dubbogoLogger.Infof("failed to marshal LocalityID: %#v, loads won't be reported", scw.localityID())
}
}
if err != nil {
if d.counter != nil {
// Release one request count if this pick fails.
d.counter.EndRequest()
}
return pr, err
}
if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
oldDone := pr.Done
pr.Done = func(info balancer.DoneInfo) {
if oldDone != nil {
oldDone(info)
}
d.loadStore.CallFinished(lIDStr, info.Err)
load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport)
if !ok {
return
}
d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization)
d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization)
for n, c := range load.RequestCost {
d.loadStore.CallServerLoad(lIDStr, n, c)
}
for n, c := range load.Utilization {
d.loadStore.CallServerLoad(lIDStr, n, c)
}
}
}
if d.counter != nil {
// Update Done() so that when the RPC finishes, the request count will
// be released.
oldDone := pr.Done
pr.Done = func(doneInfo balancer.DoneInfo) {
d.counter.EndRequest()
if oldDone != nil {
oldDone(doneInfo)
}
}
}
return pr, err
}