breakdown.go (216 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apm // import "go.elastic.co/apm/v2" import ( "fmt" "sync" "sync/atomic" "time" "go.elastic.co/apm/v2/model" ) const ( // breakdownMetricsLimit is the maximum number of breakdown metric // buckets to accumulate per reporting period. Metrics are broken // down by {transactionType, transactionName, spanType, spanSubtype} // tuples. breakdownMetricsLimit = 1000 // appSpanType is the special span type associated with transactions, // for reporting transaction self-time. appSpanType = "app" // Breakdown metric names. spanSelfTimeCountMetricName = "span.self_time.count" spanSelfTimeSumMetricName = "span.self_time.sum.us" ) var ( breakdownMetricsLimitWarning = fmt.Sprintf(` The limit of %d breakdown metricsets has been reached, no new metricsets will be created. Try to name your transactions so that there are less distinct transaction names.`[1:], breakdownMetricsLimit, ) ) // spanTimingsKey identifies a span type and subtype, for use as the key in // spanTimingsMap. type spanTimingsKey struct { spanType string spanSubtype string } // spanTiming records the number of times a {spanType, spanSubtype} pair // has occurred (within the context of a transaction group), along with // the sum of the span durations. type spanTiming struct { duration int64 count uint64 } // spanTimingsMap records span timings for a transaction group. type spanTimingsMap map[spanTimingsKey]spanTiming // add accumulates the timing for a {spanType, spanSubtype} pair. func (m spanTimingsMap) add(spanType, spanSubtype string, d time.Duration) { k := spanTimingsKey{spanType: spanType, spanSubtype: spanSubtype} timing := m[k] timing.count++ timing.duration += int64(d) m[k] = timing } // reset resets m back to its initial zero state. func (m spanTimingsMap) reset() { for k := range m { delete(m, k) } } // breakdownMetrics holds a pair of breakdown metrics maps. The "active" map // accumulates new breakdown metrics, and is swapped with the "inactive" map // just prior to when metrics gathering begins. When metrics gathering // completes, the inactive map will be empty. // // breakdownMetrics may be written to concurrently by the tracer, and any // number of other goroutines when a transaction cannot be enqueued. type breakdownMetrics struct { enabled bool mu sync.RWMutex active, inactive *breakdownMetricsMap } func newBreakdownMetrics() *breakdownMetrics { return &breakdownMetrics{ active: newBreakdownMetricsMap(), inactive: newBreakdownMetricsMap(), } } type breakdownMetricsMap struct { mu sync.RWMutex m map[uint64][]*breakdownMetricsMapEntry space []breakdownMetricsMapEntry entries int } func newBreakdownMetricsMap() *breakdownMetricsMap { return &breakdownMetricsMap{ m: make(map[uint64][]*breakdownMetricsMapEntry), space: make([]breakdownMetricsMapEntry, breakdownMetricsLimit), } } type breakdownMetricsMapEntry struct { breakdownMetricsKey breakdownTiming } // breakdownMetricsKey identifies a transaction group, and optionally a // spanTimingsKey, for recording transaction and span breakdown metrics. type breakdownMetricsKey struct { transactionType string transactionName string spanTimingsKey } func (k breakdownMetricsKey) hash() uint64 { h := newFnv1a() h.add(k.transactionType) h.add(k.transactionName) if k.spanType != "" { h.add(k.spanType) } if k.spanSubtype != "" { h.add(k.spanSubtype) } return uint64(h) } // breakdownTiming holds breakdown metrics. type breakdownTiming struct { // span holds the "span.self_time" metric values. span spanTiming } func (lhs *breakdownTiming) accumulate(rhs breakdownTiming) { atomic.AddUint64(&lhs.span.count, rhs.span.count) atomic.AddInt64(&lhs.span.duration, rhs.span.duration) } // recordTransaction records breakdown metrics for td into m. // // recordTransaction returns true if breakdown metrics were // completely recorded, and false if any metrics were not // recorded due to the limit being reached. func (m *breakdownMetrics) recordTransaction(td *TransactionData) bool { m.mu.RLock() defer m.mu.RUnlock() k := breakdownMetricsKey{ transactionType: td.Type, transactionName: td.Name, spanTimingsKey: spanTimingsKey{ spanType: appSpanType, }, } var transactionSpanTiming spanTiming if td.breakdownMetricsEnabled { endTime := td.timestamp.Add(td.Duration) transactionSelfTime := td.Duration - td.childrenTimer.finalDuration(endTime) transactionSpanTiming = spanTiming{count: 1, duration: int64(transactionSelfTime)} } if !m.active.record(k, breakdownTiming{ span: transactionSpanTiming, }) { // We couldn't record the transaction's metricset, so we won't // be able to record spans for that transaction either. return false } ok := true for sk, timing := range td.spanTimings { k.spanTimingsKey = sk ok = ok && m.active.record(k, breakdownTiming{span: timing}) } return ok } // record records a single breakdown metric, identified by k. func (m *breakdownMetricsMap) record(k breakdownMetricsKey, bt breakdownTiming) bool { hash := k.hash() m.mu.RLock() entries, ok := m.m[hash] m.mu.RUnlock() var offset int if ok { for offset = range entries { if entries[offset].breakdownMetricsKey == k { // The append may reallocate the entries, but the // entries are pointers into m.activeSpace. Therefore, // entries' timings can safely be atomically incremented // without holding the read lock. entries[offset].breakdownTiming.accumulate(bt) return true } } offset++ // where to start searching with the write lock below } m.mu.Lock() entries, ok = m.m[hash] if ok { for i := range entries[offset:] { if entries[offset+i].breakdownMetricsKey == k { m.mu.Unlock() entries[offset+i].breakdownTiming.accumulate(bt) return true } } } else if m.entries >= breakdownMetricsLimit { m.mu.Unlock() return false } entry := &m.space[m.entries] *entry = breakdownMetricsMapEntry{ breakdownTiming: bt, breakdownMetricsKey: k, } m.m[hash] = append(entries, entry) m.entries++ m.mu.Unlock() return true } // gather is called by builtinMetricsGatherer to gather breakdown metrics. func (m *breakdownMetrics) gather(out *Metrics) { // Hold m.mu only long enough to swap m.active and m.inactive. // This will be blocked by metric updates, but that's OK; only // metrics gathering will be delayed. After swapping we do not // need to hold m.mu, since nothing concurrently accesses // m.inactive while the gatherer is iterating over it. m.mu.Lock() m.active, m.inactive = m.inactive, m.active m.mu.Unlock() for hash, entries := range m.inactive.m { for _, entry := range entries { if entry.span.count > 0 { out.transactionGroupMetrics = append(out.transactionGroupMetrics, &model.Metrics{ Transaction: model.MetricsTransaction{ Type: entry.transactionType, Name: entry.transactionName, }, Span: model.MetricsSpan{ Type: entry.spanType, Subtype: entry.spanSubtype, }, Samples: map[string]model.Metric{ spanSelfTimeCountMetricName: { Value: float64(entry.span.count), }, spanSelfTimeSumMetricName: { Value: durationMicros(time.Duration(entry.span.duration)), }, }, }) } entry.breakdownMetricsKey = breakdownMetricsKey{} // release strings } delete(m.inactive.m, hash) } m.inactive.entries = 0 } // childrenTimer tracks time spent by children of a transaction or span. // // childrenTimer is not goroutine-safe. type childrenTimer struct { // active holds the number active children. active int // start holds the timestamp at which active went from zero to one. start time.Time // totalDuration holds the total duration of time periods in which // at least one child was active. totalDuration time.Duration } func (t *childrenTimer) childStarted(start time.Time) { t.active++ if t.active == 1 { t.start = start } } func (t *childrenTimer) childEnded(end time.Time) { t.active-- if t.active == 0 { t.totalDuration += end.Sub(t.start) } } func (t *childrenTimer) finalDuration(end time.Time) time.Duration { if t.active > 0 { t.active = 0 t.totalDuration += end.Sub(t.start) } return t.totalDuration }