ingestor/cluster/health.go (117 lines of code) (raw):

package cluster import ( "context" "sync" "time" ) const ( ReasonLargeUploadQueue = "LargeUploadQueue" ReasonLargeTransferQueue = "LargeTransferQueue" ReasonMaxSegmentsExceeded = "MaxSegmentsExceeded" ReasonMaxDiskUsageExceeded = "MaxDiskUsageExceeded" ) // Health tracks the health of peers in the cluster. If a peer is overloaded, it will be marked as unhealthy // which will cause the service to stop sending writes to that peer for timeout period. Similarly, the // of the current peer is tracked here and if it is unhealthy, the service will stop accepting writes. type Health struct { opts HealthOpts QueueSizer QueueSizer mu sync.RWMutex state map[string]*HealthStatus } type HealthStatus struct { Healthy bool NextCheck time.Time } type HealthOpts struct { // UnhealthyTimeout is the amount of time to wait before marking a peer as healthy. UnhealthyTimeout time.Duration QueueSizer QueueSizer MaxSegmentCount int64 MaxDiskUsage int64 } type PeerHealthReporter interface { IsPeerHealthy(peer string) bool SetPeerUnhealthy(peer string) SetPeerHealthy(peer string) } type QueueSizer interface { TransferQueueSize() int UploadQueueSize() int SegmentsTotal() int64 SegmentsSize() int64 MaxSegmentAge() time.Duration } func NewHealth(opts HealthOpts) *Health { if opts.UnhealthyTimeout.Seconds() == 0 { opts.UnhealthyTimeout = time.Minute } h := &Health{ opts: opts, QueueSizer: opts.QueueSizer, state: make(map[string]*HealthStatus), } return h } func (h *Health) Open(ctx context.Context) error { return nil } func (h *Health) Close() error { return nil } func (h *Health) IsHealthy() bool { return h.UnhealthyReason() == "" } func (h *Health) UnhealthyReason() string { segmentsTotal := h.QueueSizer.SegmentsTotal() segmentsSize := h.QueueSizer.SegmentsSize() if segmentsTotal >= h.opts.MaxSegmentCount { return ReasonMaxSegmentsExceeded } if segmentsSize >= h.opts.MaxDiskUsage { return ReasonMaxDiskUsageExceeded } return "" } func (h *Health) IsPeerHealthy(peer string) bool { h.mu.RLock() defer h.mu.RUnlock() s := h.state[peer] // We don't know about this peer, so assume it's healthy. if s == nil { return true } return s.Healthy || (s.NextCheck.IsZero() || time.Now().UTC().After(s.NextCheck)) } func (h *Health) SetPeerUnhealthy(peer string) { h.mu.Lock() defer h.mu.Unlock() s := h.state[peer] if s == nil { s = &HealthStatus{} } s.Healthy = false s.NextCheck = time.Now().UTC().Add(h.opts.UnhealthyTimeout) h.state[peer] = s } func (h *Health) SetPeerHealthy(peer string) { h.mu.Lock() defer h.mu.Unlock() s := h.state[peer] if s == nil { s = &HealthStatus{} } s.Healthy = true s.NextCheck = time.Time{} h.state[peer] = s } func (h *Health) UploadQueueSize() int { return h.QueueSizer.UploadQueueSize() } func (h *Health) TransferQueueSize() int { return h.QueueSizer.TransferQueueSize() } func (h *Health) SegmentsTotal() int64 { return h.QueueSizer.SegmentsTotal() } func (h *Health) SegmentsSize() int64 { return h.QueueSizer.SegmentsSize() } func (h *Health) MaxSegmentAge() time.Duration { return h.QueueSizer.MaxSegmentAge() }