pkg/rules/redigo/redigo_otel_conn.go (94 lines of code) (raw):

// Copyright (c) 2024 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package redigo import ( "container/list" "context" "github.com/gomodule/redigo/redis" "os" "strconv" "time" ) const max_queue_length = 2048 var configuredQueueLength int var commandQueue = list.New() var redigoInstrumenter = BuildRedigoInstrumenter() type armsConn struct { redis.Conn endpoint string ctx context.Context } func (a *armsConn) Close() error { return a.Conn.Close() } func (a *armsConn) Err() error { return a.Conn.Err() } func (a *armsConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { req := &redigoRequest{ args: args, endpoint: a.endpoint, cmd: commandName, } ctx := a.ctx if ctx == nil { ctx = context.Background() } startTime := time.Now() reply, err = a.Conn.Do(commandName, args...) endTime := time.Now() redigoInstrumenter.StartAndEnd(ctx, req, nil, err, startTime, endTime) return } func (a *armsConn) Send(commandName string, args ...interface{}) error { now := time.Now() req := &redigoRequest{ args: args, endpoint: a.endpoint, cmd: commandName, startTime: now, } ctx := a.ctx if ctx == nil { ctx = context.Background() } req.ctx = ctx push(req) return a.Conn.Send(commandName, args...) } func (a *armsConn) Flush() error { return a.Conn.Flush() } func (a *armsConn) Receive() (reply interface{}, err error) { reply, err = a.Conn.Receive() req := pop() if req != nil { now := time.Now() redigoInstrumenter.StartAndEnd(req.ctx, req, nil, err, req.startTime, now) } return } func push(request *redigoRequest) { if commandQueue != nil && commandQueue.Len() > getMaxQueueLength() { return } commandQueue.PushBack(request) } func pop() *redigoRequest { front := commandQueue.Front() commandQueue.Remove(front) p, ok := front.Value.(*redigoRequest) if ok { return p } return nil } func getMaxQueueLength() int { if configuredQueueLength == 0 { var e = os.Getenv("MAX_REDIGO_QUEUE_LENGTH") if e != "" { configuredQueueLength, _ = strconv.Atoi(os.Getenv(e)) } else { configuredQueueLength = max_queue_length } } return configuredQueueLength }