metricbeat/module/nats/jetstream/data.go (492 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package jetstream import ( "encoding/json" "fmt" "time" s "github.com/elastic/beats/v7/libbeat/common/schema" c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" ) var ( moduleSchema = s.Schema{ "server": s.Object{ "id": c.Str("server_id"), "time": c.Time("now"), }, } jetstreamStatsSchema = s.Schema{ "category": c.Str("category"), "stats": s.Object{ "streams": c.Int("streams"), "consumers": c.Int("consumers"), "messages": c.Int("messages"), "bytes": c.Int("bytes"), "memory": c.Int("memory"), "reserved_memory": c.Int("reserved_memory"), "storage": c.Int("storage"), "reserved_storage": c.Int("reserved_storage"), "accounts": c.Int("accounts"), "config": s.Object{ "max_memory": c.Int("max_memory"), "max_storage": c.Int("max_storage"), "store_dir": c.Str("store_dir"), "sync_interval": c.Int("sync_interval"), }, }, } jetstreamAccountSchema = s.Schema{ "category": c.Str("category"), "account": s.Object{ "id": c.Str("id"), "name": c.Str("name"), "memory": c.Int("memory"), "storage": c.Int("storage"), "reserved_memory": c.Int("reserved_memory"), "reserved_storage": c.Int("reserved_storage"), "accounts": c.Int("accounts"), "high_availability_assets": c.Int("ha_assets"), "api": s.Object{ "total": c.Int("api_total"), "errors": c.Int("api_errors"), }, }, } jetstreamStreamSchema = s.Schema{ "category": c.Str("category"), "stream": s.Object{ "name": c.Str("name"), "created": c.Time("created"), "cluster": s.Object{ "leader": c.Str("leader"), }, "state": s.Object{ "messages": c.Int("messages"), "bytes": c.Int("bytes"), "first_seq": c.Int("first_seq"), "first_ts": c.Time("first_ts"), "last_seq": c.Int("last_seq"), "last_ts": c.Time("last_ts"), "consumer_count": c.Int("consumer_count"), "num_deleted": c.Int("num_deleted"), "num_subjects": c.Int("num_subjects"), }, "account": s.Object{ "id": c.Str("account_id"), "name": c.Str("account_name"), }, "config": s.Object{ "description": c.Str("config_description"), "retention": c.Str("config_retention"), "num_replicas": c.Int("config_num_replicas"), "storage": c.Str("config_storage"), "max_consumers": c.Int("config_max_consumers"), "max_msgs": c.Int("config_max_msgs"), "max_bytes": c.Int("config_max_bytes"), "max_age": c.Int("config_max_age"), "max_msgs_per_subject": c.Int("config_max_msgs_per_subject"), "max_msg_size": c.Int("config_max_msg_size"), "subjects": s.Conv{ Key: "config_subjects", Func: func(key string, data map[string]interface{}) (interface{}, error) { emptyIface, err := mapstr.M(data).GetValue(key) if err != nil { return []string{}, s.NewKeyNotFoundError(key) } switch val := emptyIface.(type) { case []string: return val, nil default: msg := fmt.Sprintf("expected []string, found %T", emptyIface) return []string{}, s.NewWrongFormatError(key, msg) } }, }, }, }, } jetstreamConsumerSchema = s.Schema{ "category": c.Str("category"), "consumer": s.Object{ "name": c.Str("name"), "created": c.Time("created"), "stream": s.Object{ "name": c.Str("stream_name"), }, "cluster": s.Object{ "leader": c.Str("leader"), }, "delivered": s.Object{ "consumer_seq": c.Int("delivered_consumer_seq"), "stream_seq": c.Int("delivered_stream_seq"), "last_active": c.Time("delivered_last_active"), }, "ack_floor": s.Object{ "consumer_seq": c.Int("ack_consumer_seq"), "stream_seq": c.Int("ack_stream_seq"), "last_active": c.Time("ack_last_active"), }, "num_ack_pending": c.Int("num_ack_pending"), "num_redelivered": c.Int("num_redelivered"), "num_waiting": c.Int("num_waiting"), "num_pending": c.Int("num_pending"), "last_active_time": c.Time("ts"), "account": s.Object{ "id": c.Str("account_id"), "name": c.Str("account_name"), }, "config": s.Object{ "name": c.Str("name"), "durable_name": c.Str("config_durable_name"), "deliver_policy": c.Str("config_deliver_policy"), "filter_subject": c.Str("config_filter_subject"), "replay_policy": c.Str("config_replay_policy"), "ack_policy": c.Str("config_ack_policy"), "ack_wait": c.Int("config_ack_wait"), "max_deliver": c.Int("config_max_deliver"), "max_waiting": c.Int("config_max_waiting"), "max_ack_pending": c.Int("config_max_ack_pending"), "num_replicas": c.Int("config_num_replicas"), }, }, } ) type NamedItem interface { GetName() string } type JetstreamResponse struct { AccountDetails []JetstreamAccountDetails `json:"account_details"` Bytes uint64 `json:"bytes"` Config JetstreamConfig `json:"config,omitempty"` Consumers uint64 `json:"consumers"` Messages uint64 `json:"messages"` Now time.Time `json:"now,omitempty"` ServerID string `json:"server_id,omitempty"` Streams uint64 `json:"streams"` Memory uint64 `json:"memory"` Storage uint64 `json:"storage"` ReservedMemory uint64 `json:"reserved_memory"` ReservedStorage uint64 `json:"reserved_storage"` Accounts uint64 `json:"accounts"` } type JetstreamConfig struct { MaxMemory uint64 `json:"max_memory"` MaxStorage uint64 `json:"max_storage"` StoreDir string `json:"store_dir,omitempty"` SyncInterval uint64 `json:"sync_interval"` } type JetstreamAccountDetails struct { Id string `json:"id,omitempty"` Name string `json:"name,omitempty"` Memory uint64 `json:"memory"` Storage uint64 `json:"storage"` Accounts uint64 `json:"accounts"` ReservedMemory uint64 `json:"reserved_memory"` ReservedStorage uint64 `json:"reserved_storage"` HighAvailabilityAssets uint64 `json:"ha_assets"` ApiStats AccountApiStats `json:"api"` StreamDetails []JetstreamStreamDetail `json:"stream_detail"` } type AccountApiStats struct { Total uint64 `json:"total"` Errors uint64 `json:"errors"` } func (me JetstreamAccountDetails) GetName() string { return me.Name } type JetstreamStreamDetail struct { Cluster JetstreamStreamClusterInfo `json:"cluster"` Consumers []JetstreamConsumerDetail `json:"consumer_detail"` Created time.Time `json:"created,omitempty"` Name string `json:"name,omitempty"` State JetstreamStreamState `json:"state"` Config JetstreamStreamConfig `json:"config"` } func (me JetstreamStreamDetail) GetName() string { return me.Name } type JetstreamStreamConfig struct { Description string `json:"description,omitempty"` Retention string `json:"retention"` Subjects []string `json:"subjects"` AllowRollupHeaders bool `json:"allow_rollup_hdrs"` DenyPurge bool `json:"deny_purge"` DenyDelete bool `json:"deny_delete"` Sealed bool `json:"sealed"` MirrorDirect bool `json:"mirror_direct"` AllowDirect bool `json:"allow_direct"` Compression string `json:"compression,omitempty"` DuplicateWindow uint64 `json:"duplicate_window"` NumReplicas uint64 `json:"num_replicas"` Storage string `json:"storage"` Discard string `json:"discard"` // These can be negative so should remain int64 MaxConsumers int64 `json:"max_consumers"` MaxMsgs int64 `json:"max_msgs"` MaxBytes int64 `json:"max_bytes"` MaxAge int64 `json:"max_age"` MaxMessagesPerSubject int64 `json:"max_msgs_per_subject"` MaxMessageSize int64 `json:"max_msg_size"` } type JetstreamStreamClusterInfo struct { Leader string `json:"leader"` } type JetstreamStreamState struct { Bytes uint64 `json:"bytes"` ConsumerCount uint64 `json:"consumer_count"` FirstSequence uint64 `json:"first_seq"` FirstTimestamp time.Time `json:"first_ts,omitempty"` LastSequence uint64 `json:"last_seq"` LastTimestamp time.Time `json:"last_ts,omitempty"` Messages uint64 `json:"messages"` NumSubjects uint64 `json:"num_subjects"` NumDeleted uint64 `json:"num_deleted"` } type JetstreamConsumerDetail struct { AckFloor JetstreamConsumerAckFloor `json:"ack_floor"` Created time.Time `json:"created,omitempty"` Config JetstreamConsumerConfig `json:"config"` Delivered JetstreamConsumerDelivered `json:"delivered"` Name string `json:"name,omitempty"` NumAckPending uint64 `json:"num_ack_pending"` NumRedelivered uint64 `json:"num_redelivered"` NumPending uint64 `json:"num_pending"` NumWaiting uint64 `json:"num_waiting"` StreamName string `json:"stream_name,omitempty"` Timestamp time.Time `json:"ts,omitempty"` } func (me JetstreamConsumerDetail) GetName() string { return me.Name } type JetstreamConsumerConfig struct { DurableName string `json:"durable_name,omitempty"` DeliverPolicy string `json:"deliver_policy,omitempty"` AckPolicy string `json:"ack_policy,omitempty"` FilterSubject string `json:"filter_subject,omitempty"` ReplayPolicy string `json:"replay_policy,omitempty"` // These can be negative so must be int64 MaxWaiting int64 `json:"max_waiting"` MaxAckPending int64 `json:"max_ack_pending"` NumReplicas int64 `json:"num_replicas"` AckWait int64 `json:"ack_wait"` MaxDeliver int64 `json:"max_deliver"` } type JetstreamConsumerDelivered struct { ConsumerSequence uint64 `json:"consumer_seq"` StreamSequence uint64 `json:"stream_seq"` LastActive time.Time `json:"last_active,omitempty"` } type JetstreamConsumerAckFloor struct { ConsumerSequence uint64 `json:"consumer_seq"` StreamSequence uint64 `json:"stream_seq"` LastActive time.Time `json:"last_active,omitempty"` } func eventMapping(m *MetricSet, r mb.ReporterV2, content []byte) error { var response JetstreamResponse err := json.Unmarshal(content, &response) if err != nil { return fmt.Errorf("failure parsing NATS Jetstream API response: %w", err) } if m.Config.Stats.Enabled { err = statsMapping(r, response) } for _, account := range filterByName(response.AccountDetails, m.Config.Account.Names) { if m.Config.Account.Enabled { err = accountMapping(r, account, response) } for _, stream := range filterByName(account.StreamDetails, m.Config.Stream.Names) { if m.Config.Stream.Enabled { err = streamMapping(r, stream, account, response) } for _, consumer := range filterByName(stream.Consumers, m.Config.Consumer.Names) { if m.Config.Consumer.Enabled { err = consumerMapping(r, consumer, stream, account, response) } } } } return err } func statsMapping(r mb.ReporterV2, response JetstreamResponse) error { moduleFields, timestamp, err := getSharedEventDetails(response) if err != nil { return fmt.Errorf("failure applying module schema: %w", err) } metricSetFields, err := jetstreamStatsSchema.Apply(map[string]interface{}{ "category": statsCategory, "max_memory": response.Config.MaxMemory, "max_storage": response.Config.MaxStorage, "store_dir": response.Config.StoreDir, "sync_interval": response.Config.SyncInterval, "streams": response.Streams, "consumers": response.Consumers, "messages": response.Messages, "bytes": response.Bytes, "memory": response.Memory, "storage": response.Storage, "reserved_memory": response.ReservedMemory, "reserved_storage": response.ReservedStorage, "accounts": response.Accounts, }) if err != nil { return fmt.Errorf("failure applying jetstream.stats schema: %w", err) } // Create and emit the event event := mb.Event{ MetricSetFields: metricSetFields, ModuleFields: moduleFields, Timestamp: timestamp, } if !r.Event(event) { return nil } return nil } func filterByName[T NamedItem](collection []T, allowedValues []string) []T { // No filters. Return all. if len(allowedValues) == 0 { return collection } // Put into map for faster lookup filters := map[string]bool{} for _, val := range allowedValues { filters[val] = true } filtered := make([]T, 0) for _, item := range collection { if filters[item.GetName()] { filtered = append(filtered, item) } } return filtered } func accountMapping(r mb.ReporterV2, account JetstreamAccountDetails, response JetstreamResponse) error { moduleFields, timestamp, err := getSharedEventDetails(response) if err != nil { return fmt.Errorf("failure applying module schema: %w", err) } metricSetFields, err := jetstreamAccountSchema.Apply(map[string]interface{}{ "category": accountCategory, "id": account.Id, "name": account.Name, "memory": account.Memory, "storage": account.Storage, "reserved_memory": account.ReservedMemory, "reserved_storage": account.ReservedStorage, "accounts": account.Accounts, "ha_assets": account.HighAvailabilityAssets, "api_total": account.ApiStats.Total, "api_errors": account.ApiStats.Errors, }) if err != nil { return fmt.Errorf("failure applying jetstream.account schema: %w", err) } // Create and emit the event event := mb.Event{ MetricSetFields: metricSetFields, ModuleFields: moduleFields, Timestamp: timestamp, } if !r.Event(event) { return nil } return nil } func streamMapping(r mb.ReporterV2, stream JetstreamStreamDetail, account JetstreamAccountDetails, response JetstreamResponse) error { moduleFields, timestamp, err := getSharedEventDetails(response) if err != nil { return fmt.Errorf("failure applying module schema: %w", err) } metricSetFields, err := jetstreamStreamSchema.Apply(map[string]interface{}{ "category": streamCategory, "name": stream.Name, "created": stream.Created, "leader": stream.Cluster.Leader, "messages": stream.State.Messages, "bytes": stream.State.Bytes, "first_seq": stream.State.FirstSequence, "first_ts": stream.State.FirstTimestamp, "last_seq": stream.State.LastSequence, "last_ts": stream.State.LastTimestamp, "consumer_count": stream.State.ConsumerCount, "num_deleted": stream.State.NumDeleted, "num_subjects": stream.State.NumSubjects, "account_id": account.Id, "account_name": account.Name, "config_description": stream.Config.Description, "config_retention": stream.Config.Retention, "config_num_replicas": stream.Config.NumReplicas, "config_storage": stream.Config.Storage, "config_max_consumers": stream.Config.MaxConsumers, "config_subjects": stream.Config.Subjects, "config_max_msgs": stream.Config.MaxMsgs, "config_max_bytes": stream.Config.MaxBytes, "config_max_age": stream.Config.MaxAge, "config_max_msgs_per_subject": stream.Config.MaxMessagesPerSubject, "config_max_msg_size": stream.Config.MaxMessageSize, }) if err != nil { return fmt.Errorf("failure applying jetstream.stream schema: %w", err) } // Create and emit the event event := mb.Event{ MetricSetFields: metricSetFields, ModuleFields: moduleFields, Timestamp: timestamp, } if !r.Event(event) { return nil } return nil } func consumerMapping(r mb.ReporterV2, consumer JetstreamConsumerDetail, stream JetstreamStreamDetail, account JetstreamAccountDetails, response JetstreamResponse) error { moduleFields, timestamp, err := getSharedEventDetails(response) if err != nil { return fmt.Errorf("failure applying module schema: %w", err) } metricSetFields, err := jetstreamConsumerSchema.Apply(map[string]interface{}{ "category": consumerCategory, "stream_name": stream.Name, "name": consumer.Name, "leader": stream.Cluster.Leader, "created": consumer.Created, "delivered_consumer_seq": consumer.Delivered.ConsumerSequence, "delivered_stream_seq": consumer.Delivered.StreamSequence, "delivered_last_active": consumer.Delivered.LastActive, "ack_consumer_seq": consumer.AckFloor.ConsumerSequence, "ack_stream_seq": consumer.AckFloor.StreamSequence, "ack_last_active": consumer.AckFloor.LastActive, "num_ack_pending": consumer.NumAckPending, "num_redelivered": consumer.NumRedelivered, "num_waiting": consumer.NumWaiting, "num_pending": consumer.NumPending, "ts": consumer.Timestamp, "account_id": account.Id, "account_name": account.Name, "config_durable_name": consumer.Config.DurableName, "config_deliver_policy": consumer.Config.DeliverPolicy, "config_filter_subject": consumer.Config.FilterSubject, "config_replay_policy": consumer.Config.ReplayPolicy, "config_ack_policy": consumer.Config.AckPolicy, "config_ack_wait": consumer.Config.AckWait, "config_max_deliver": consumer.Config.MaxDeliver, "config_max_waiting": consumer.Config.MaxWaiting, "config_max_ack_pending": consumer.Config.MaxAckPending, "config_num_replicas": consumer.Config.NumReplicas, }) if err != nil { return fmt.Errorf("failure applying jetstream.consumer schema: %w", err) } // Create and emit the event event := mb.Event{ MetricSetFields: metricSetFields, ModuleFields: moduleFields, Timestamp: timestamp, } if !r.Event(event) { return nil } return nil } func getSharedEventDetails(response JetstreamResponse) (mapstr.M, time.Time, error) { moduleFields, err := moduleSchema.Apply(map[string]interface{}{ "server_id": response.ServerID, "now": response.Now, }) if err != nil { return nil, time.Now(), fmt.Errorf("failure applying module schema: %w", err) } return moduleFields, response.Now, nil }