metricbeat/module/redis/key/key.go (91 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package key
import (
"fmt"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/beats/v7/metricbeat/module/redis"
)
var hostParser = parse.URLHostParserBuilder{DefaultScheme: "redis"}.Build()
func init() {
mb.Registry.MustAddMetricSet("redis", "key", New,
mb.WithHostParser(hostParser),
)
}
// MetricSet for fetching Redis server information and statistics.
type MetricSet struct {
*redis.MetricSet
patterns []KeyPattern
}
// KeyPattern contains the information required to query keys
type KeyPattern struct {
Keyspace *uint `config:"keyspace"`
Pattern string `config:"pattern" validate:"required"`
Limit uint `config:"limit"`
}
// New creates new instance of MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct {
Patterns []KeyPattern `config:"key.patterns" validate:"nonzero,required"`
}{}
err := base.Module().UnpackConfig(&config)
if err != nil {
return nil, fmt.Errorf("failed to read configuration for 'key' metricset: %w", err)
}
ms, err := redis.NewMetricSet(base)
if err != nil {
return nil, fmt.Errorf("failed to create 'key' metricset: %w", err)
}
return &MetricSet{
MetricSet: ms,
patterns: config.Patterns,
}, nil
}
// Fetch fetches information from Redis keys
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
conn := m.Connection()
defer func() {
if err := conn.Close(); err != nil {
m.Logger().Debug(fmt.Errorf("failed to release connection: %w", err))
}
}()
for _, p := range m.patterns {
var keyspace uint
if p.Keyspace == nil {
keyspace = m.OriginalDBNumber()
} else {
keyspace = *p.Keyspace
}
if err := redis.Select(conn, keyspace); err != nil {
msg := fmt.Errorf("Failed to select keyspace %d: %w", keyspace, err)
m.Logger().Error(msg)
r.Error(err)
continue
}
keys, err := redis.FetchKeys(conn, p.Pattern, p.Limit)
if err != nil {
msg := fmt.Errorf("Failed to list keys in keyspace %d with pattern '%s': %w", keyspace, p.Pattern, err)
m.Logger().Error(msg)
r.Error(err)
continue
}
if p.Limit > 0 && len(keys) > int(p.Limit) {
m.Logger().Debugf("Collecting stats for %d keys, but there are more available for pattern '%s' in keyspace %d", p.Limit, p.Pattern, keyspace)
keys = keys[:p.Limit]
}
for _, key := range keys {
keyInfo, err := redis.FetchKeyInfo(conn, key, m.Logger())
if err != nil {
msg := fmt.Errorf("Failed to fetch key info for key %s in keyspace %d", key, keyspace)
m.Logger().Error(msg)
r.Error(err)
continue
}
if keyInfo == nil {
m.Logger().Debugf("Ignoring removed key %s from keyspace %d", key, keyspace)
continue
}
event := eventMapping(keyspace, keyInfo)
if !r.Event(event) {
m.Logger().Debug("Failed to report event, interrupting fetch")
return nil
}
}
}
return nil
}