router/pkg/statistics/engine_stats.go (115 lines of code) (raw):
package statistics
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type EngineStatistics interface {
GetReport() *UsageReport
SubscriptionUpdateSent()
ConnectionsInc()
ConnectionsDec()
SubscriptionCountInc(count int)
SubscriptionCountDec(count int)
TriggerCountInc(count int)
TriggerCountDec(count int)
}
type EngineStats struct {
mu sync.Mutex
ctx context.Context
logger *zap.Logger
reportStats bool
connections atomic.Uint64
subscriptions atomic.Uint64
messagesSent atomic.Uint64
triggers atomic.Uint64
}
type UsageReport struct {
Connections uint64
Subscriptions uint64
MessagesSent uint64
Triggers uint64
}
// NewEngineStats creates a new EngineStats instance. If reportStats is true, the stats will be reported every 5 seconds.
func NewEngineStats(ctx context.Context, logger *zap.Logger, reportStats bool) *EngineStats {
stats := &EngineStats{
ctx: ctx,
logger: logger,
mu: sync.Mutex{},
reportStats: reportStats,
}
if reportStats {
go stats.runReporter(ctx)
}
return stats
}
func (s *EngineStats) GetReport() *UsageReport {
report := &UsageReport{
Connections: s.connections.Load(),
Subscriptions: s.subscriptions.Load(),
MessagesSent: s.messagesSent.Load(),
Triggers: s.triggers.Load(),
}
return report
}
func (s *EngineStats) runReporter(ctx context.Context) {
tickReport := time.NewTicker(time.Second * 5)
defer tickReport.Stop()
for {
select {
case <-ctx.Done():
return
case <-tickReport.C:
s.reportConnections()
}
}
}
func (s *EngineStats) reportConnections() {
s.logger.Info("WebSocket Stats",
zap.Uint64("open_connections", s.connections.Load()),
zap.Uint64("triggers", s.triggers.Load()),
zap.Uint64("active_subscriptions", s.subscriptions.Load()),
)
}
func (s *EngineStats) SubscriptionUpdateSent() {
s.messagesSent.Inc()
}
func (s *EngineStats) ConnectionsInc() {
s.connections.Inc()
}
func (s *EngineStats) ConnectionsDec() {
s.connections.Dec()
}
func (s *EngineStats) SubscriptionCountInc(count int) {
s.subscriptions.Add(uint64(count))
}
func (s *EngineStats) SubscriptionCountDec(count int) {
s.subscriptions.Sub(uint64(count))
}
func (s *EngineStats) TriggerCountInc(count int) {
s.triggers.Add(uint64(count))
}
func (s *EngineStats) TriggerCountDec(count int) {
s.triggers.Sub(uint64(count))
}
type NoopEngineStats struct{}
func NewNoopEngineStats() *NoopEngineStats {
return &NoopEngineStats{}
}
func (s *NoopEngineStats) Subscribe(_ context.Context) chan *UsageReport {
return nil
}
func (s *NoopEngineStats) GetReport() *UsageReport {
return nil
}
func (s *NoopEngineStats) SubscriptionUpdateSent() {}
func (s *NoopEngineStats) ConnectionsInc() {}
func (s *NoopEngineStats) ConnectionsDec() {}
func (s *NoopEngineStats) SubscriptionCountInc(_ int) {}
func (s *NoopEngineStats) SubscriptionCountDec(_ int) {}
func (s *NoopEngineStats) SynchronousSubscriptionsInc() {}
func (s *NoopEngineStats) SynchronousSubscriptionsDec() {}
func (s *NoopEngineStats) TriggerCountInc(count int) {}
func (s *NoopEngineStats) TriggerCountDec(count int) {}
var _ EngineStatistics = &EngineStats{}