extension/agenthealth/handler/stats/provider/statuscode.go (135 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package provider
import (
"context"
"net/http"
"sync"
"time"
"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/amazon-cloudwatch-agent/extension/agenthealth/handler/stats/agent"
)
const (
statusResetInterval = 5 * time.Minute
statusHandlerID = "cloudwatchagent.StatusCodeHandler"
)
var (
statusCodeProviderSingleton *StatusCodeProvider
StatusCodeProviderOnce sync.Once
)
// StatusCodeProvider handles processing of status codes and maintains stats.
type StatusCodeProvider struct {
currentStats map[string]*[5]int
mu sync.RWMutex
statusCodeChan chan statusCodeEntry
stopChan chan struct{}
resetTicker *time.Ticker
completedStats chan agent.Stats
}
type statusCodeEntry struct {
operation string
statusCode int
}
func GetStatusCodeStatsProvider() *StatusCodeProvider {
StatusCodeProviderOnce.Do(func() {
provider := &StatusCodeProvider{
currentStats: make(map[string]*[5]int),
statusCodeChan: make(chan statusCodeEntry, 1000),
stopChan: make(chan struct{}),
resetTicker: time.NewTicker(statusResetInterval),
completedStats: make(chan agent.Stats, 1), // buffered channel
}
provider.startProcessing()
statusCodeProviderSingleton = provider
})
return statusCodeProviderSingleton
}
func (sp *StatusCodeProvider) startProcessing() {
go func() {
for {
select {
case entry := <-sp.statusCodeChan:
sp.processStatusCode(entry)
case <-sp.resetTicker.C:
sp.RotateStats()
case <-sp.stopChan:
sp.resetTicker.Stop()
return
}
}
}()
}
func (sp *StatusCodeProvider) EnqueueStatusCode(operation string, statusCode int) {
sp.statusCodeChan <- statusCodeEntry{operation: operation, statusCode: statusCode}
}
func (sp *StatusCodeProvider) processStatusCode(entry statusCodeEntry) {
sp.mu.Lock()
defer sp.mu.Unlock()
stats, exists := sp.currentStats[entry.operation]
if !exists {
stats = &[5]int{}
sp.currentStats[entry.operation] = stats
}
switch entry.statusCode {
case 200:
stats[0]++
case 400:
stats[1]++
case 408:
stats[2]++
case 413:
stats[3]++
case 429:
stats[4]++
}
}
func (sp *StatusCodeProvider) RotateStats() {
sp.mu.Lock()
newStats := agent.Stats{
StatusCodes: make(map[string][5]int, len(sp.currentStats)),
}
for op, stats := range sp.currentStats {
newStats.StatusCodes[op] = *stats
}
sp.currentStats = make(map[string]*[5]int)
sp.mu.Unlock()
select {
case existingStats := <-sp.completedStats:
existingStats.Merge(newStats)
newStats = existingStats
default:
}
sp.completedStats <- newStats
}
func (sp *StatusCodeProvider) Stats(_ string) agent.Stats {
select {
case stats := <-sp.completedStats:
return stats
default:
return agent.Stats{}
}
}
type StatusCodeHandler struct {
StatusCodeProvider *StatusCodeProvider
filter agent.OperationsFilter
}
func NewStatusCodeHandler(provider *StatusCodeProvider, filter agent.OperationsFilter) *StatusCodeHandler {
return &StatusCodeHandler{
StatusCodeProvider: provider,
filter: filter,
}
}
func (h *StatusCodeHandler) HandleResponse(ctx context.Context, r *http.Response) {
operation := awsmiddleware.GetOperationName(ctx)
if !h.filter.IsAllowed(operation) {
return
}
operation = agent.GetShortOperationName(operation)
if operation == "" {
return
}
h.StatusCodeProvider.EnqueueStatusCode(operation, r.StatusCode)
}
func (h *StatusCodeHandler) ID() string {
return statusHandlerID
}
func (h *StatusCodeHandler) Position() awsmiddleware.HandlerPosition {
return awsmiddleware.After
}