pulsar/internal/memory_limit_controller.go (111 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package internal import ( "context" "sync" "sync/atomic" ) type MemoryLimitController interface { ReserveMemory(ctx context.Context, size int64) bool TryReserveMemory(size int64) bool ForceReserveMemory(size int64) ReleaseMemory(size int64) CurrentUsage() int64 CurrentUsagePercent() float64 IsMemoryLimited() bool RegisterTrigger(trigger func()) } type memoryLimitController struct { currentUsage int64 limit int64 chCond *chCond triggers []*thresholdTrigger // valid range is (0, 1.0) triggerThreshold float64 } type thresholdTrigger struct { triggerFunc func() triggerRunning int32 } func (t *thresholdTrigger) canTryRunning() bool { return atomic.CompareAndSwapInt32(&t.triggerRunning, 0, 1) } func (t *thresholdTrigger) setRunning(isRunning bool) { if isRunning { atomic.StoreInt32(&t.triggerRunning, 1) } else { atomic.StoreInt32(&t.triggerRunning, 0) } } // NewMemoryLimitController threshold valid range is (0, 1.0) func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController { mlc := &memoryLimitController{ limit: limit, chCond: newCond(&sync.Mutex{}), triggerThreshold: threshold, } return mlc } func (m *memoryLimitController) ReserveMemory(ctx context.Context, size int64) bool { if !m.TryReserveMemory(size) { m.chCond.L.Lock() defer m.chCond.L.Unlock() for !m.TryReserveMemory(size) { if !m.chCond.waitWithContext(ctx) { return false } } } return true } func (m *memoryLimitController) TryReserveMemory(size int64) bool { for { current := atomic.LoadInt64(&m.currentUsage) newUsage := current + size // This condition means we allowed one request to go over the limit. if m.IsMemoryLimited() && current > m.limit { return false } if atomic.CompareAndSwapInt64(&m.currentUsage, current, newUsage) { m.checkTrigger(current, newUsage) return true } } } func (m *memoryLimitController) ForceReserveMemory(size int64) { nextUsage := atomic.AddInt64(&m.currentUsage, size) prevUsage := nextUsage - size m.checkTrigger(prevUsage, nextUsage) } func (m *memoryLimitController) ReleaseMemory(size int64) { newUsage := atomic.AddInt64(&m.currentUsage, -size) if newUsage+size > m.limit && newUsage <= m.limit { m.chCond.broadcast() } } func (m *memoryLimitController) CurrentUsage() int64 { return atomic.LoadInt64(&m.currentUsage) } func (m *memoryLimitController) CurrentUsagePercent() float64 { return float64(atomic.LoadInt64(&m.currentUsage)) / float64(m.limit) } func (m *memoryLimitController) IsMemoryLimited() bool { return m.limit > 0 } func (m *memoryLimitController) RegisterTrigger(trigger func()) { m.chCond.L.Lock() defer m.chCond.L.Unlock() m.triggers = append(m.triggers, &thresholdTrigger{ triggerFunc: trigger, }) } func (m *memoryLimitController) checkTrigger(prevUsage int64, nextUsage int64) { nextUsagePercent := float64(nextUsage) / float64(m.limit) prevUsagePercent := float64(prevUsage) / float64(m.limit) if nextUsagePercent >= m.triggerThreshold && prevUsagePercent < m.triggerThreshold { for _, trigger := range m.triggers { if trigger.canTryRunning() { go func(trigger *thresholdTrigger) { trigger.triggerFunc() trigger.setRunning(false) }(trigger) } } } }