ptp/ptp4u/stats/json.go (107 lines of code) (raw):
/*
Copyright (c) Facebook, Inc. and its affiliates.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package stats
import (
"encoding/json"
"fmt"
"net/http"
"sync/atomic"
ptp "github.com/facebook/time/ptp/protocol"
log "github.com/sirupsen/logrus"
)
// JSONStats is what we want to report as stats via http
type JSONStats struct {
report counters
counters
}
// NewJSONStats returns a new JSONStats
func NewJSONStats() *JSONStats {
s := &JSONStats{}
s.init()
s.report.init()
return s
}
// Start runs http server and initializes maps
func (s *JSONStats) Start(monitoringport int) {
mux := http.NewServeMux()
mux.HandleFunc("/", s.handleRequest)
addr := fmt.Sprintf(":%d", monitoringport)
log.Infof("Starting http json server on %s", addr)
err := http.ListenAndServe(addr, mux)
if err != nil {
log.Fatalf("Failed to start listener: %v", err)
}
}
// Snapshot the values so they can be reported atomically
func (s *JSONStats) Snapshot() {
s.subscriptions.copy(&s.report.subscriptions)
s.rx.copy(&s.report.rx)
s.tx.copy(&s.report.tx)
s.rxSignaling.copy(&s.report.rxSignaling)
s.txSignaling.copy(&s.report.txSignaling)
s.workerQueue.copy(&s.report.workerQueue)
s.workerSubs.copy(&s.report.workerSubs)
s.txtsattempts.copy(&s.report.txtsattempts)
s.report.utcoffset = s.utcoffset
s.report.drain = s.drain
}
// handleRequest is a handler used for all http monitoring requests
func (s *JSONStats) handleRequest(w http.ResponseWriter, r *http.Request) {
js, err := json.Marshal(s.report.toMap())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
if _, err = w.Write(js); err != nil {
log.Errorf("Failed to reply: %v", err)
}
}
// Reset atomically sets all the counters to 0
func (s *JSONStats) Reset() {
s.reset()
}
// IncSubscription atomically add 1 to the counter
func (s *JSONStats) IncSubscription(t ptp.MessageType) {
s.subscriptions.inc(int(t))
}
// IncRX atomically add 1 to the counter
func (s *JSONStats) IncRX(t ptp.MessageType) {
s.rx.inc(int(t))
}
// IncTX atomically add 1 to the counter
func (s *JSONStats) IncTX(t ptp.MessageType) {
s.tx.inc(int(t))
}
// IncRXSignaling atomically add 1 to the counter
func (s *JSONStats) IncRXSignaling(t ptp.MessageType) {
s.rxSignaling.inc(int(t))
}
// IncTXSignaling atomically add 1 to the counter
func (s *JSONStats) IncTXSignaling(t ptp.MessageType) {
s.txSignaling.inc(int(t))
}
// IncWorkerSubs atomically add 1 to the counter
func (s *JSONStats) IncWorkerSubs(workerid int) {
s.workerSubs.inc(workerid)
}
// DecSubscription atomically removes 1 from the counter
func (s *JSONStats) DecSubscription(t ptp.MessageType) {
s.subscriptions.dec(int(t))
}
// DecRX atomically removes 1 from the counter
func (s *JSONStats) DecRX(t ptp.MessageType) {
s.rx.dec(int(t))
}
// DecTX atomically removes 1 from the counter
func (s *JSONStats) DecTX(t ptp.MessageType) {
s.tx.dec(int(t))
}
// DecRXSignaling atomically removes 1 from the counter
func (s *JSONStats) DecRXSignaling(t ptp.MessageType) {
s.rxSignaling.dec(int(t))
}
// DecTXSignaling atomically removes 1 from the counter
func (s *JSONStats) DecTXSignaling(t ptp.MessageType) {
s.txSignaling.dec(int(t))
}
// DecWorkerSubs atomically removes 1 from the counter
func (s *JSONStats) DecWorkerSubs(workerid int) {
s.workerSubs.dec(workerid)
}
// SetMaxWorkerQueue atomically sets worker queue len
func (s *JSONStats) SetMaxWorkerQueue(workerid int, queue int64) {
if queue > s.workerQueue.load(workerid) {
s.workerQueue.store(workerid, queue)
}
}
// SetMaxTXTSAttempts atomically sets number of retries for get latest TX timestamp
func (s *JSONStats) SetMaxTXTSAttempts(workerid int, attempts int64) {
if attempts > s.txtsattempts.load(workerid) {
s.txtsattempts.store(workerid, attempts)
}
}
// SetUTCOffset atomically sets the utcoffset
func (s *JSONStats) SetUTCOffset(utcoffset int64) {
atomic.StoreInt64(&s.utcoffset, utcoffset)
}
// SetDrain atomically sets the drain status
func (s *JSONStats) SetDrain(drain int64) {
atomic.StoreInt64(&s.drain, drain)
}