ptp/ptp4u/stats/stats.go (153 lines of code) (raw):
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package stats implements statistics collection and reporting.
It is used by server to report internal statistics, such as number of
requests and responses.
*/
package stats
import (
"fmt"
"strings"
"sync"
ptp "github.com/facebook/time/ptp/protocol"
)
// Stats is a metric collection interface
type Stats interface {
// Start starts a stat reporter
// Use this for passive reporters
Start(monitoringport int)
// Snapshot the values so they can be reported atomically
Snapshot()
// Reset atomically sets all the counters to 0
Reset()
// IncSubscription atomically add 1 to the counter
IncSubscription(t ptp.MessageType)
// IncRX atomically add 1 to the counter
IncRX(t ptp.MessageType)
// IncTX atomically add 1 to the counter
IncTX(t ptp.MessageType)
// IncRXSignaling atomically add 1 to the counter
IncRXSignaling(t ptp.MessageType)
// IncTXSignaling atomically add 1 to the counter
IncTXSignaling(t ptp.MessageType)
// IncWorkerSubs atomically add 1 to the counter
IncWorkerSubs(workerid int)
// DecSubscription atomically removes 1 from the counter
DecSubscription(t ptp.MessageType)
// DecRX atomically removes 1 from the counter
DecRX(t ptp.MessageType)
// DecTX atomically removes 1 from the counter
DecTX(t ptp.MessageType)
// DecRXSignaling atomically removes 1 from the counter
DecRXSignaling(t ptp.MessageType)
// DecTXSignaling atomically removes 1 from the counter
DecTXSignaling(t ptp.MessageType)
// DecWorkerSubs atomically removes 1 from the counter
DecWorkerSubs(workerid int)
// SetMaxWorkerQueue atomically sets worker queue len
SetMaxWorkerQueue(workerid int, queue int64)
// SetMaxTXTSAttempts atomically sets number of retries for get latest TX timestamp
SetMaxTXTSAttempts(workerid int, retries int64)
// SetUTCOffset atomically sets the utcoffset
SetUTCOffset(utcoffset int64)
// SetDrain atomically sets the drain status
SetDrain(drain int64)
}
// syncMapInt64 sync map of PTP messages
type syncMapInt64 struct {
sync.Mutex
m map[int]int64
}
// init initializes the underlying map
func (s *syncMapInt64) init() {
s.m = make(map[int]int64)
}
// keys returns slice of keys of the underlying map
func (s *syncMapInt64) keys() []int {
keys := make([]int, 0, len(s.m))
s.Lock()
for k := range s.m {
keys = append(keys, k)
}
s.Unlock()
return keys
}
// load gets the value by the key
func (s *syncMapInt64) load(key int) int64 {
s.Lock()
defer s.Unlock()
return s.m[key]
}
// inc increments the counter for the given key
func (s *syncMapInt64) inc(key int) {
s.Lock()
s.m[key]++
s.Unlock()
}
// dec decrements the counter for the given key
func (s *syncMapInt64) dec(key int) {
s.Lock()
s.m[key]--
s.Unlock()
}
// store saves the value with the key
func (s *syncMapInt64) store(key int, value int64) {
s.Lock()
s.m[key] = value
s.Unlock()
}
// copy all key-values between maps
func (s *syncMapInt64) copy(dst *syncMapInt64) {
for _, t := range s.keys() {
dst.store(t, s.load(t))
}
}
// reset stats to 0
func (s *syncMapInt64) reset() {
s.Lock()
for t := range s.m {
s.m[t] = 0
}
s.Unlock()
}
type counters struct {
rx syncMapInt64
rxSignaling syncMapInt64
subscriptions syncMapInt64
tx syncMapInt64
txSignaling syncMapInt64
txtsattempts syncMapInt64
workerQueue syncMapInt64
workerSubs syncMapInt64
utcoffset int64
drain int64
}
func (c *counters) init() {
c.subscriptions.init()
c.rx.init()
c.tx.init()
c.rxSignaling.init()
c.txSignaling.init()
c.workerQueue.init()
c.workerSubs.init()
c.txtsattempts.init()
}
func (c *counters) reset() {
c.subscriptions.reset()
c.rx.reset()
c.tx.reset()
c.rxSignaling.reset()
c.txSignaling.reset()
c.workerQueue.reset()
c.workerSubs.reset()
c.txtsattempts.reset()
c.utcoffset = 0
c.drain = 0
}
// toMap converts counters to a map
func (c *counters) toMap() (export map[string]int64) {
res := make(map[string]int64)
for _, t := range c.subscriptions.keys() {
c := c.subscriptions.load(t)
mt := strings.ToLower(ptp.MessageType(t).String())
res[fmt.Sprintf("subscriptions.%s", mt)] = c
}
for _, t := range c.rx.keys() {
c := c.rx.load(t)
mt := strings.ToLower(ptp.MessageType(t).String())
res[fmt.Sprintf("rx.%s", mt)] = c
}
for _, t := range c.tx.keys() {
c := c.tx.load(t)
mt := strings.ToLower(ptp.MessageType(t).String())
res[fmt.Sprintf("tx.%s", mt)] = c
}
for _, t := range c.rxSignaling.keys() {
c := c.rxSignaling.load(t)
mt := strings.ToLower(ptp.MessageType(t).String())
res[fmt.Sprintf("rx.signaling.%s", mt)] = c
}
for _, t := range c.txSignaling.keys() {
c := c.txSignaling.load(t)
mt := strings.ToLower(ptp.MessageType(t).String())
res[fmt.Sprintf("tx.signaling.%s", mt)] = c
}
for _, t := range c.workerQueue.keys() {
c := c.workerQueue.load(t)
res[fmt.Sprintf("worker.%d.queue", t)] = c
}
for _, t := range c.workerSubs.keys() {
c := c.workerSubs.load(t)
res[fmt.Sprintf("worker.%d.subscriptions", t)] = c
}
for _, t := range c.txtsattempts.keys() {
c := c.txtsattempts.load(t)
res[fmt.Sprintf("worker.%d.txtsattempts", t)] = c
}
res["utcoffset"] = c.utcoffset
res["drain"] = c.drain
return res
}