tracker/peerhandoutpolicy/peerhandoutpolicy.go (66 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package peerhandoutpolicy import ( "fmt" "sort" "github.com/uber-go/tally" "github.com/uber/kraken/core" ) type peerPriorityInfo struct { peer *core.PeerInfo priority int label string } // assignmentPolicy defines the policy for assigning priority to peers. type assignmentPolicy interface { assignPriority(peer *core.PeerInfo) (priority int, label string) } // PriorityPolicy wraps an assignmentPolicy and uses it to sort lists of peers. type PriorityPolicy struct { stats tally.Scope policy assignmentPolicy } // NewPriorityPolicy returns a PriorityPolicy that assigns priorities using the given priority policy. func NewPriorityPolicy(stats tally.Scope, priorityPolicy string) (*PriorityPolicy, error) { p := &PriorityPolicy{ stats: stats.Tagged(map[string]string{ "module": "peerhandoutpolicy", "priority": priorityPolicy, }), } switch priorityPolicy { case _defaultPolicy: p.policy = newDefaultAssignmentPolicy() case _completenessPolicy: p.policy = newCompletenessAssignmentPolicy() default: return nil, fmt.Errorf("priority policy %q not found", priorityPolicy) } return p, nil } // SortPeers returns the given list of peers sorted by the priority assigned to them // by the priorityPolicy. Excludes the source peer from the list. func (p *PriorityPolicy) SortPeers(source *core.PeerInfo, peers []*core.PeerInfo) []*core.PeerInfo { peerPriorities := make([]*peerPriorityInfo, 0, len(peers)) for k := 0; k < len(peers); k++ { if peers[k] != source { priority, label := p.policy.assignPriority(peers[k]) peerPriorities = append(peerPriorities, &peerPriorityInfo{peers[k], priority, label}) } } sort.Slice(peerPriorities, func(i, j int) bool { return peerPriorities[i].priority < peerPriorities[j].priority }) priorityCounts := make(map[string]int) for k := 0; k < len(peerPriorities); k++ { p := peerPriorities[k] peers[k] = p.peer if _, ok := priorityCounts[p.label]; ok { priorityCounts[p.label]++ } else { priorityCounts[p.label] = 1 } } peers = peers[:len(peerPriorities)] for label, count := range priorityCounts { p.stats.Tagged(map[string]string{ "label": label, }).Gauge("count").Update(float64(count)) } return peers }