stats/tally.go (139 lines of code) (raw):

package stats import ( "sync" "time" "github.com/uber/tchannel-go" "github.com/uber-go/tally" ) type wrapper struct { sync.RWMutex scope tally.Scope byTags map[knownTags]*taggedScope } type knownTags struct { dest string source string procedure string retryCount string } type taggedScope struct { sync.RWMutex scope tally.Scope // already tagged with some set of tags counters map[string]tally.Counter gauges map[string]tally.Gauge timers map[string]tally.Timer } // NewTallyReporter takes a tally.Scope and wraps it so it ca be used as a // StatsReporter. The list of metrics emitted is documented on: // https://tchannel.readthedocs.io/en/latest/metrics/ // The metrics emitted are similar to YARPC, the tags emitted are: // source, dest, procedure, and retry-count. func NewTallyReporter(scope tally.Scope) tchannel.StatsReporter { return &wrapper{ scope: scope, byTags: make(map[knownTags]*taggedScope), } } func (w *wrapper) IncCounter(name string, tags map[string]string, value int64) { ts := w.getTaggedScope(tags) ts.getCounter(name).Inc(value) } func (w *wrapper) UpdateGauge(name string, tags map[string]string, value int64) { ts := w.getTaggedScope(tags) ts.getGauge(name).Update(float64(value)) } func (w *wrapper) RecordTimer(name string, tags map[string]string, d time.Duration) { ts := w.getTaggedScope(tags) ts.getTimer(name).Record(d) } func (w *wrapper) getTaggedScope(tags map[string]string) *taggedScope { kt := convertTags(tags) w.RLock() ts, ok := w.byTags[kt] w.RUnlock() if ok { return ts } w.Lock() defer w.Unlock() // Always double-check under the write-lock. if ts, ok := w.byTags[kt]; ok { return ts } ts = &taggedScope{ scope: w.scope.Tagged(kt.tallyTags()), counters: make(map[string]tally.Counter), gauges: make(map[string]tally.Gauge), timers: make(map[string]tally.Timer), } w.byTags[kt] = ts return ts } func convertTags(tags map[string]string) knownTags { if ts, ok := tags["target-service"]; ok { // Outbound call. return knownTags{ dest: ts, source: tags["service"], procedure: tags["target-endpoint"], retryCount: tags["retry-count"], } } if cs, ok := tags["calling-service"]; ok { // Inbound call. return knownTags{ dest: tags["service"], source: cs, procedure: tags["endpoint"], retryCount: tags["retry-count"], } } // TChannel doesn't use any other tags, so ignore all others for now. return knownTags{} } // Create a sub-scope for this set of known tags. func (kt knownTags) tallyTags() map[string]string { tallyTags := make(map[string]string, 5) if kt.dest != "" { tallyTags["dest"] = kt.dest } if kt.source != "" { tallyTags["source"] = kt.source } if kt.procedure != "" { tallyTags["procedure"] = kt.procedure } if kt.retryCount != "" { tallyTags["retry-count"] = kt.retryCount } return tallyTags } func (ts *taggedScope) getCounter(name string) tally.Counter { ts.RLock() counter, ok := ts.counters[name] ts.RUnlock() if ok { return counter } ts.Lock() defer ts.Unlock() // No double-check under the lock, as overwriting the counter has // no impact. counter = ts.scope.Counter(name) ts.counters[name] = counter return counter } func (ts *taggedScope) getGauge(name string) tally.Gauge { ts.RLock() gauge, ok := ts.gauges[name] ts.RUnlock() if ok { return gauge } ts.Lock() defer ts.Unlock() // No double-check under the lock, as overwriting the counter has // no impact. gauge = ts.scope.Gauge(name) ts.gauges[name] = gauge return gauge } func (ts *taggedScope) getTimer(name string) tally.Timer { ts.RLock() timer, ok := ts.timers[name] ts.RUnlock() if ok { return timer } ts.Lock() defer ts.Unlock() // No double-check under the lock, as overwriting the counter has // no impact. timer = ts.scope.Timer(name) ts.timers[name] = timer return timer }