plugins/input/redis/input_redis.go (205 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package redis import ( "bufio" "errors" "fmt" "net" "net/url" "strconv" "strings" "sync" "time" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" ) type InputRedis struct { ServerUrls []string context pipeline.Context } // ## specify servers via a url matching: // ## [protocol://][:password]@address[:port] // ## e.g. // ## tcp://localhost:6379 // ## tcp://:password@192.168.99.100 // ## unix:///var/run/redis.sock // ## // ## If no servers are specified, then localhost is used as the host. // ## If no port is specified, 6379 is used // servers = ["tcp://localhost:6379"] func (r *InputRedis) Init(context pipeline.Context) (int, error) { r.context = context return 0, nil } func (r *InputRedis) Description() string { return "redis input plugin for logtail" } var Tracking = map[string]string{ "uptime_in_seconds": "uptime", "connected_clients": "clients", "role": "replication_role", } type TotalDBInfo struct { keys int64 avgTTL int64 expires int64 dbCount int64 } var ErrProtocolError = errors.New("redis protocol error") const defaultPort = "6379" func (r *InputRedis) Collect(collector pipeline.Collector) error { if len(r.ServerUrls) == 0 { return r.gatherServer(&url.URL{ Scheme: "tcp", Host: ":6379", }, collector) } var wg sync.WaitGroup for _, serv := range r.ServerUrls { if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") { serv = "tcp://" + serv } u, err := url.Parse(serv) if err != nil { logger.Warning(r.context.GetRuntimeContext(), "REDIS_PARSE_ADDRESS_ALARM", "Unable to parse to address", serv, "error", err) continue } else if u.Scheme == "" { // fallback to simple string based address (i.e. "10.0.0.1:10000") u.Scheme = "tcp" u.Host = serv u.Path = "" } if u.Scheme == "tcp" { _, _, err := net.SplitHostPort(u.Host) if err != nil { u.Host = u.Host + ":" + defaultPort } } wg.Add(1) go func(url *url.URL) { defer wg.Done() err := r.gatherServer(url, collector) if err != nil { logger.Error(r.context.GetRuntimeContext(), "REDIS_COLLECT_ALARM", err) } }(u) } wg.Wait() return nil } var defaultTimeout = 5 * time.Second func (r *InputRedis) gatherServer(addr *url.URL, collector pipeline.Collector) error { var address string if addr.Scheme == "unix" { address = addr.Path } else { address = addr.Host } c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout) if err != nil { return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err) } defer func(c net.Conn) { _ = c.Close() }(c) // Extend connection _ = c.SetDeadline(time.Now().Add(defaultTimeout)) if addr.User != nil { pwd, set := addr.User.Password() if set && pwd != "" { _, _ = c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd))) rdr := bufio.NewReader(c) line, err := rdr.ReadString('\n') if err != nil { return err } if line[0] != '+' { return fmt.Errorf("%s", strings.TrimSpace(line)[1:]) } } } _, _ = c.Write([]byte("INFO\r\n")) _, _ = c.Write([]byte("EOF\r\n")) rdr := bufio.NewReader(c) var tags map[string]string if addr.Scheme == "unix" { tags = map[string]string{"socket": addr.Path} } else { // If there's an error, ignore and use 'unknown' tags host, port, err := net.SplitHostPort(addr.Host) if err != nil { host, port = "unknown", "unknown" } tags = map[string]string{"server": host, "port": port} } return gatherInfoOutput(rdr, collector, tags) } // gatherInfoOutput gathers func gatherInfoOutput( rdr *bufio.Reader, collector pipeline.Collector, tags map[string]string, ) error { var section string var dbTotal TotalDBInfo scanner := bufio.NewScanner(rdr) fields := make(map[string]string) for scanner.Scan() { line := scanner.Text() if strings.Contains(line, "ERR") { break } if len(line) == 0 { continue } if line[0] == '#' { if len(line) > 2 { section = line[2:] } continue } parts := strings.SplitN(line, ":", 2) if len(parts) < 2 { continue } name := parts[0] if section == "Server" { if name != "lru_clock" && name != "uptime_in_seconds" && name != "redis_version" { continue } } if name == "mem_allocator" { continue } metric, ok := Tracking[name] if !ok { if section == "Keyspace" { kline := strings.TrimSpace(parts[1]) gatherKeyspaceLine(name, kline, &fields, &dbTotal) continue } metric = name } val := strings.TrimSpace(parts[1]) fields[metric] = val } if dbTotal.dbCount > 0 { dbTotal.avgTTL /= dbTotal.dbCount } fields["total_db_count"] = strconv.FormatInt(dbTotal.dbCount, 10) fields["total_db_avg_ttl"] = strconv.FormatInt(dbTotal.avgTTL, 10) fields["total_db_keys"] = strconv.FormatInt(dbTotal.keys, 10) fields["total_db_expires"] = strconv.FormatInt(dbTotal.expires, 10) collector.AddData(tags, fields) return nil } // Parse the special Keyspace line at end of redis stats // This is a special line that looks something like: // // db0:keys=2,expires=0,avg_ttl=0 // // And there is one for each db on the redis instance func gatherKeyspaceLine(name, line string, fields *map[string]string, dbTotal *TotalDBInfo) { if strings.Contains(line, "keys=") { dbparts := strings.Split(line, ",") for _, dbp := range dbparts { if kv := strings.Split(dbp, "="); len(kv) >= 2 { (*fields)[name+"_"+kv[0]] = kv[1] val, _ := strconv.ParseInt(kv[1], 10, 64) switch kv[0] { case "keys": dbTotal.keys += val case "expires": dbTotal.expires += val case "avg_ttl": dbTotal.avgTTL += val } } } dbTotal.dbCount++ } } func init() { pipeline.MetricInputs["metric_redis"] = func() pipeline.MetricInput { return &InputRedis{} } }