metricbeat/module/redis/redis.go (162 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. /* Package redis contains shared Redis functionality for the metric sets */ package redis import ( "strings" "time" rd "github.com/gomodule/redigo/redis" "github.com/elastic/elastic-agent-libs/logp" ) // Redis types const ( TypeNone = "none" TypeString = "string" TypeList = "list" TypeSet = "set" TypeSortedSet = "zset" TypeHash = "hash" ) // ParseRedisInfo parses the string returned by the INFO command // Every line is split up into key and value func ParseRedisInfo(info string) map[string]string { // Feed every line into result := strings.Split(info, "\r\n") // Load redis info values into array values := map[string]string{} for _, value := range result { // Values are separated by : parts := ParseRedisLine(value, ":") if len(parts) == 2 { if strings.Contains(parts[0], "cmdstat_") { cmdstats := ParseRedisCommandStats(parts[0], parts[1]) for k, v := range cmdstats { key := parts[0] + "_" + k values[key] = v } } else { values[parts[0]] = parts[1] } } } return values } // ParseRedisLine parses a single line returned by INFO func ParseRedisLine(s, delimiter string) []string { return strings.Split(s, delimiter) } // ParseRedisCommandStats parses a map of stats returned by INFO COMMANDSTATS func ParseRedisCommandStats(key, s string) map[string]string { // calls=XX,usec=XXX,usec_per_call=XXX results := strings.Split(s, ",") values := map[string]string{} for _, value := range results { parts := strings.Split(value, "=") if len(parts) == 2 { values[parts[0]] = parts[1] } } return values } // FetchRedisInfo returns a map of requested stats. func FetchRedisInfo(stat string, c rd.Conn, logger *logp.Logger) (map[string]string, error) { out, err := rd.String(c.Do("INFO", stat)) if err != nil { logger.Errorf("Error retrieving INFO stats: %v", err) return nil, err } return ParseRedisInfo(out), nil } // FetchSlowLogLength returns count of slow operations func FetchSlowLogLength(c rd.Conn, logger *logp.Logger) (int64, error) { count, err := rd.Int64(c.Do("SLOWLOG", "len")) if err != nil { logger.Errorf("Error retrieving slowlog len: %v", err) return 0, err } return count, nil } // FetchKeyInfo collects info about a key func FetchKeyInfo(c rd.Conn, key string, logger *logp.Logger) (map[string]interface{}, error) { keyType, err := rd.String(c.Do("TYPE", key)) if err != nil { return nil, err } if keyType == TypeNone { // Ignore it, it has been removed return nil, nil } keyTTL, err := rd.Int64(c.Do("TTL", key)) if err != nil { return nil, err } info := map[string]interface{}{ "name": key, "type": keyType, "expire": map[string]interface{}{ "ttl": keyTTL, }, } lenCommand := "" switch keyType { case TypeString: lenCommand = "STRLEN" case TypeList: lenCommand = "LLEN" case TypeSet: lenCommand = "SCARD" case TypeSortedSet: lenCommand = "ZCARD" case TypeHash: lenCommand = "HLEN" default: logger.Named("redis").Debugf("Not supported length for type %s", keyType) } if lenCommand != "" { length, err := rd.Int64(c.Do(lenCommand, key)) if err != nil { return nil, err } info["length"] = length } return info, nil } // FetchKeys gets a list of keys based on a pattern using SCAN, `limit` is a // safeguard to limit the number of commands executed and the number of keys // returned, if more than `limit` keys are being collected the method stops // and returns the keys already collected. Setting `limit` to ' disables this // limit. func FetchKeys(c rd.Conn, pattern string, limit uint) ([]string, error) { cursor := 0 var keys []string for { resp, err := rd.Values(c.Do("SCAN", cursor, "MATCH", pattern)) if err != nil { return nil, err } var scanKeys []string _, err = rd.Scan(resp, &cursor, &scanKeys) if err != nil { return nil, err } keys = append(keys, scanKeys...) if cursor == 0 || (limit > 0 && len(keys) > int(limit)) { break } } return keys, nil } // Select selects the keyspace to use for this connection func Select(c rd.Conn, keyspace uint) error { _, err := c.Do("SELECT", keyspace) return err } // Pool is a redis pool that keeps track of the database number originally configured type Pool struct { *rd.Pool dbNumber int } // DBNumber returns the db number originally used to configure this pool func (p *Pool) DBNumber() int { return p.dbNumber } // CreatePool creates a redis connection pool func CreatePool(host, username, password string, dbNumber int, config *Config, connTimeout time.Duration) *Pool { pool := &rd.Pool{ MaxIdle: config.MaxConn, IdleTimeout: config.IdleTimeout, Dial: func() (rd.Conn, error) { dialOptions := []rd.DialOption{ rd.DialUsername(username), rd.DialPassword(password), rd.DialDatabase(dbNumber), rd.DialConnectTimeout(connTimeout), rd.DialReadTimeout(connTimeout), rd.DialWriteTimeout(connTimeout), } if config.TLS.IsEnabled() { dialOptions = append(dialOptions, rd.DialUseTLS(true), rd.DialTLSConfig(config.UseTLSConfig), ) } return rd.Dial(config.Network, host, dialOptions...) }, } return &Pool{Pool: pool, dbNumber: dbNumber} }