plugins/server/grpc/server_limiter.go (82 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package grpc import ( "context" "fmt" "sync/atomic" "time" "github.com/apache/skywalking-satellite/internal/satellite/telemetry" "github.com/sirupsen/logrus" "github.com/shirou/gopsutil/cpu" "github.com/apache/skywalking-satellite/internal/pkg/log" ) const CPUUpdateInterval = time.Second * 5 type AcceptConnectionConfig struct { CPUUtilization float64 `mapstructure:"cpu_utilization"` // The max CPU utilization limit ConnectionCount int32 `mapstructure:"connection_count"` // The max connection count } type AcceptLimiter struct { Config AcceptConnectionConfig ActiveConnection int32 CurrentCPU float64 logger *logrus.Entry telemetry.Counter } func NewAcceptLimiter(config AcceptConnectionConfig) (*AcceptLimiter, error) { limiter := &AcceptLimiter{Config: config} if err := limiter.init(); err != nil { return nil, err } return limiter, nil } func (a *AcceptLimiter) init() error { ctx := context.Background() // init logger a.logger = log.Logger. WithField("client-name", Name). WithField("component", "accept-limiter") // cpu analyzer adaptor if err := a.cpuUsage(ctx); err != nil { return fmt.Errorf("could not find cpu usage analyzer: %v", err) } // init telemetry telemetry.NewGauge("grpc_server_cpu_gauge", "The cpu usage of satellite process", func() float64 { return a.CurrentCPU }) telemetry.NewGauge("grpc_server_connection_count", "The active connection count of gRPC server", func() float64 { return float64(a.ActiveConnection) }) return nil } func (a *AcceptLimiter) cpuUsage(ctx context.Context) error { if _, err := cpu.Times(false); err != nil { return err } go func() { for { select { case <-ctx.Done(): return default: if percent, err := cpu.Percent(CPUUpdateInterval, false); err != nil { a.logger.Warnf("error query the cpu usage: %v", err) } else { a.CurrentCPU = percent[0] } } } }() return nil } func (a *AcceptLimiter) CouldHandleConnection() bool { // cpu check if a.CurrentCPU >= a.Config.CPUUtilization { return false } // active connection check if a.ActiveConnection >= a.Config.ConnectionCount { return false } // try to add the active count if atomic.AddInt32(&a.ActiveConnection, 1) > a.Config.ConnectionCount { atomic.AddInt32(&a.ActiveConnection, -1) return false } return true } func (a *AcceptLimiter) CloseConnection() { atomic.AddInt32(&a.ActiveConnection, -1) }