banyand/protector/protector.go (152 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package protector provides a set of protectors that stop the query services when the resource usage exceeds the limit. package protector import ( "context" "errors" "fmt" "runtime/metrics" "sync/atomic" "time" "github.com/dustin/go-humanize" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/cgroups" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/run" ) var scope = observability.RootScope.SubScope("memory_protector") // Memory is a protector that stops the query services when the memory usage exceeds the limit. type Memory struct { omr observability.MetricsRegistry limitGauge meter.Gauge usageGauge meter.Gauge l *logger.Logger closed chan struct{} blockedChan chan struct{} allowedPercent int allowedBytes run.Bytes limit uint64 usage uint64 } // NewMemory creates a new Memory protector. func NewMemory(omr observability.MetricsRegistry) *Memory { queueSize := cgroups.CPUs() factory := omr.With(scope) return &Memory{ omr: omr, blockedChan: make(chan struct{}, queueSize), closed: make(chan struct{}), limitGauge: factory.NewGauge("limit"), usageGauge: factory.NewGauge("usage"), } } // AcquireResource attempts to acquire a `size` amount of memory. func (m *Memory) AcquireResource(ctx context.Context, size uint64) error { if m.limit == 0 { return nil } start := time.Now() select { case m.blockedChan <- struct{}{}: defer func() { <-m.blockedChan }() case <-ctx.Done(): return fmt.Errorf("context canceled while waiting for blocked queue slot: %w", ctx.Err()) } for { currentUsage := atomic.LoadUint64(&m.usage) if currentUsage+size <= m.limit { return nil } select { case <-time.After(100 * time.Millisecond): continue case <-ctx.Done(): return fmt.Errorf( "context canceled: memory acquisition failed (currentUsage: %d, limit: %d, size: %d, blockedDuration: %v): %w", currentUsage, m.limit, size, time.Since(start), ctx.Err(), ) } } } // Name returns the name of the protector. func (m *Memory) Name() string { return "memory-protector" } // FlagSet returns the flag set for the protector. func (m *Memory) FlagSet() *run.FlagSet { flagS := run.NewFlagSet(m.Name()) flagS.IntVarP(&m.allowedPercent, "allowed-percent", "", 75, "Allowed bytes of memory usage. If the memory usage exceeds this value, the query services will stop. "+ "Setting a large value may evict data from the OS page cache, causing high disk I/O.") flagS.VarP(&m.allowedBytes, "allowed-bytes", "", "Allowed percentage of total memory usage. If usage exceeds this value, the query services will stop. "+ "This takes effect only if `allowed-bytes` is 0. If usage is too high, it may cause OS page cache eviction.") return flagS } // Validate validates the protector's flags. func (m *Memory) Validate() error { if m.allowedPercent <= 0 || m.allowedPercent > 100 { if m.allowedBytes <= 0 { return errors.New("allowed-bytes must be greater than 0") } return errors.New("allowed-percent must be in the range (0, 100]") } return nil } // PreRun initializes the protector. func (m *Memory) PreRun(context.Context) error { m.l = logger.GetLogger(m.Name()) if m.allowedBytes > 0 { m.limit = uint64(m.allowedBytes) m.l.Info(). Str("limit", humanize.Bytes(m.limit)). Msg("memory protector enabled") } else { cgLimit, err := cgroups.MemoryLimit() if err != nil { m.l.Warn().Err(err).Msg("failed to get memory limit from cgroups, disable memory protector") return nil } if cgLimit <= 0 || cgLimit > 1e18 { m.l.Warn().Int64("cgroup_memory_limit", cgLimit).Msg("cgroup memory limit is invalid, disable memory protector") return nil } m.limit = uint64(cgLimit) * uint64(m.allowedPercent) / 100 m.l.Info(). Str("limit", humanize.Bytes(m.limit)). Str("cgroup_limit", humanize.Bytes(uint64(cgLimit))). Int("percent", m.allowedPercent). Msg("memory protector enabled") } m.limitGauge.Set(float64(m.limit)) return nil } // GracefulStop stops the protector. func (m *Memory) GracefulStop() { close(m.closed) } // Serve starts the protector. func (m *Memory) Serve() run.StopNotify { if m.limit == 0 { return m.closed } go func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-m.closed: return case <-ticker.C: samples := []metrics.Sample{ {Name: "/memory/classes/heap/objects:bytes"}, {Name: "/memory/classes/heap/stacks:bytes"}, {Name: "/memory/classes/metadata/mcache/inuse:bytes"}, {Name: "/memory/classes/metadata/mspan/inuse:bytes"}, {Name: "/memory/classes/metadata/other:bytes"}, {Name: "/memory/classes/os-stacks:bytes"}, {Name: "/memory/classes/other:bytes"}, } metrics.Read(samples) var usedBytes uint64 for _, sample := range samples { usedBytes += sample.Value.Uint64() } atomic.StoreUint64(&m.usage, usedBytes) if usedBytes > m.limit { m.l.Warn().Str("used", humanize.Bytes(usedBytes)).Str("limit", humanize.Bytes(m.limit)).Msg("memory usage exceeds limit") } } } }() return m.closed }