router/pkg/statistics/engine_stats.go (152 lines of code) (raw):
package statistics
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"go.uber.org/zap"
)
type EngineStatistics interface {
Subscribe(ctx context.Context) chan *UsageReport
GetReport() *UsageReport
SubscriptionUpdateSent()
ConnectionsInc()
ConnectionsDec()
SubscriptionCountInc(count int)
SubscriptionCountDec(count int)
TriggerCountInc(count int)
TriggerCountDec(count int)
}
type EngineStats struct {
mu sync.Mutex
ctx context.Context
logger *zap.Logger
reportStats bool
connections atomic.Uint64
subscriptions atomic.Uint64
messagesSent atomic.Uint64
triggers atomic.Uint64
update chan struct{}
subscribers map[context.Context]chan *UsageReport
}
type UsageReport struct {
Connections uint64
Subscriptions uint64
MessagesSent uint64
Triggers uint64
}
func NewEngineStats(ctx context.Context, logger *zap.Logger, reportStats bool) *EngineStats {
stats := &EngineStats{
ctx: ctx,
logger: logger,
update: make(chan struct{}),
mu: sync.Mutex{},
reportStats: reportStats,
subscribers: map[context.Context]chan *UsageReport{},
}
go stats.run(ctx)
return stats
}
func (s *EngineStats) Subscribe(ctx context.Context) chan *UsageReport {
s.mu.Lock()
defer s.mu.Unlock()
sub := make(chan *UsageReport)
s.subscribers[ctx] = sub
return sub
}
func (s *EngineStats) GetReport() *UsageReport {
report := &UsageReport{
Connections: s.connections.Load(),
Subscriptions: s.subscriptions.Load(),
MessagesSent: s.messagesSent.Load(),
Triggers: s.triggers.Load(),
}
return report
}
func (s *EngineStats) run(ctx context.Context) {
tickReport := time.NewTicker(time.Second * 5)
if !s.reportStats {
tickReport.Stop()
}
defer tickReport.Stop()
for {
select {
case <-ctx.Done():
return
case <-tickReport.C:
s.reportConnections()
case <-s.update:
s.mu.Lock()
report := s.GetReport()
for ctx, subscriber := range s.subscribers {
select {
case subscriber <- report:
case <-ctx.Done():
delete(s.subscribers, ctx)
continue
case <-s.ctx.Done():
continue
}
}
s.mu.Unlock()
}
}
}
func (s *EngineStats) reportConnections() {
s.logger.Info("WebSocket Stats",
zap.Uint64("open_connections", s.connections.Load()),
zap.Uint64("triggers", s.triggers.Load()),
zap.Uint64("active_subscriptions", s.subscriptions.Load()),
)
}
func (s *EngineStats) publish() {
s.update <- struct{}{}
}
func (s *EngineStats) SubscriptionUpdateSent() {
s.messagesSent.Inc()
s.publish()
}
func (s *EngineStats) ConnectionsInc() {
s.connections.Inc()
s.publish()
}
func (s *EngineStats) ConnectionsDec() {
s.connections.Dec()
s.publish()
}
func (s *EngineStats) SubscriptionCountInc(count int) {
s.subscriptions.Add(uint64(count))
s.publish()
}
func (s *EngineStats) SubscriptionCountDec(count int) {
s.subscriptions.Sub(uint64(count))
s.publish()
}
func (s *EngineStats) TriggerCountInc(count int) {
s.triggers.Add(uint64(count))
s.publish()
}
func (s *EngineStats) TriggerCountDec(count int) {
s.triggers.Sub(uint64(count))
s.publish()
}
type NoopEngineStats struct{}
func NewNoopEngineStats() *NoopEngineStats {
return &NoopEngineStats{}
}
func (s *NoopEngineStats) Subscribe(_ context.Context) chan *UsageReport {
return nil
}
func (s *NoopEngineStats) GetReport() *UsageReport {
return nil
}
func (s *NoopEngineStats) SubscriptionUpdateSent() {}
func (s *NoopEngineStats) ConnectionsInc() {}
func (s *NoopEngineStats) ConnectionsDec() {}
func (s *NoopEngineStats) SubscriptionCountInc(_ int) {}
func (s *NoopEngineStats) SubscriptionCountDec(_ int) {}
func (s *NoopEngineStats) SynchronousSubscriptionsInc() {}
func (s *NoopEngineStats) SynchronousSubscriptionsDec() {}
func (s *NoopEngineStats) TriggerCountInc(count int) {}
func (s *NoopEngineStats) TriggerCountDec(count int) {}
var _ EngineStatistics = &EngineStats{}