metricbeat/mb/testing/modules.go (239 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. /* Package testing provides utility functions for testing Module and MetricSet implementations. # MetricSet Example This is an example showing how to use this package to test a MetricSet. By using these methods you ensure the MetricSet is instantiated in the same way that Metricbeat does it and with the same validations. package mymetricset_test import ( "github.com/stretchr/testify/assert" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" ) func TestFetch(t *testing.T) { f := mbtest.NewFetcher(t, getConfig()) events, errs := f.FetchEvents() assert.Empty(t, errs) assert.NotEmpty(t, events) event := events[0] t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) // Test event attributes... } func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "mymodule", "metricsets": []string{"status"}, "hosts": []string{mymodule.GetHostFromEnv()}, } } */ package testing import ( "context" "testing" "time" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp/logptest" ) type TestModule struct { ModName string ModConfig mb.ModuleConfig RawConfig *conf.C } func (m *TestModule) Name() string { return m.ModName } func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) if err != nil { t.Fatal(err) } return &TestModule{RawConfig: c} } // NewMetricSet instantiates a new MetricSet using the given configuration. // The ModuleFactory and MetricSetFactory are obtained from the global // Registry. func NewMetricSet(t testing.TB, config interface{}) mb.MetricSet { return NewMetricSetWithRegistry(t, config, mb.Registry) } // NewMetricSetWithRegistry instantiates a new MetricSet using the given configuration. // The ModuleFactory and MetricSetFactory are obtained from the passed in registry. func NewMetricSetWithRegistry(t testing.TB, config interface{}, registry *mb.Register) mb.MetricSet { metricsets := NewMetricSetsWithRegistry(t, config, registry) if len(metricsets) != 1 { t.Fatal("invalid number of metricsets instantiated") } metricset := metricsets[0] if metricset == nil { t.Fatal("metricset is nil") } return metricset } // NewMetricSets instantiates a list of new MetricSets using the given // module configuration. func NewMetricSets(t testing.TB, config interface{}) []mb.MetricSet { return NewMetricSetsWithRegistry(t, config, mb.Registry) } // NewMetricSetsWithRegistry instantiates a list of new MetricSets using the given // module configuration and provided registry. func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Register) []mb.MetricSet { c, err := conf.NewConfigFrom(config) if err != nil { t.Fatal(err) } m, metricsets, err := mb.NewModule(c, registry, logptest.NewTestingLogger(t, "")) if err != nil { t.Fatal("failed to create new MetricSet", err) } if m == nil { t.Fatal("no module instantiated") } return metricsets } // NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 { return NewReportingMetricSetV2WithRegistry(t, config, mb.Registry) } // NewReportingMetricSetV2WithRegistry returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2WithRegistry(t testing.TB, config interface{}, registry *mb.Register) mb.ReportingMetricSetV2 { metricSet := NewMetricSetWithRegistry(t, config, registry) reportingMetricSetV2, ok := metricSet.(mb.ReportingMetricSetV2) if !ok { t.Fatal("MetricSet does not implement ReportingMetricSetV2") } return reportingMetricSetV2 } // NewReportingMetricSetV2Error returns a new ReportingMetricSetV2 instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.ReportingMetricSetV2Error { metricSet := NewMetricSet(t, config) reportingMetricSetV2Error, ok := metricSet.(mb.ReportingMetricSetV2Error) if !ok { t.Fatal("MetricSet does not implement ReportingMetricSetV2Error") } return reportingMetricSetV2Error } // NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances. func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error { metricSets := NewMetricSets(t, config) reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets)) for _, metricSet := range metricSets { rMS, ok := metricSet.(mb.ReportingMetricSetV2Error) if !ok { t.Fatalf("MetricSet %v does not implement ReportingMetricSetV2Error", metricSet.Name()) } reportingMetricSets = append(reportingMetricSets, rMS) } return reportingMetricSets } // NewReportingMetricSetV2WithContext returns a new ReportingMetricSetV2WithContext instance. Then // you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet. func NewReportingMetricSetV2WithContext(t testing.TB, config interface{}) mb.ReportingMetricSetV2WithContext { metricSet := NewMetricSet(t, config) reportingMetricSet, ok := metricSet.(mb.ReportingMetricSetV2WithContext) if !ok { t.Fatal("MetricSet does not implement ReportingMetricSetV2WithContext") } return reportingMetricSet } // CapturingReporterV2 is a reporter used for testing which stores all events and errors type CapturingReporterV2 struct { events []mb.Event errs []error } // Event is used to report an event func (r *CapturingReporterV2) Event(event mb.Event) bool { r.events = append(r.events, event) return true } // Error is used to report an error func (r *CapturingReporterV2) Error(err error) bool { r.errs = append(r.errs, err) return true } // GetEvents returns all reported events func (r *CapturingReporterV2) GetEvents() []mb.Event { return r.events } // GetErrors returns all reported errors func (r *CapturingReporterV2) GetErrors() []error { return r.errs } // ReportingFetchV2 runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2(metricSet mb.ReportingMetricSetV2) ([]mb.Event, []error) { r := &CapturingReporterV2{} metricSet.Fetch(r) return r.events, r.errs } // ReportingFetchV2Error runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event, []error) { r := &CapturingReporterV2{} err := metricSet.Fetch(r) if err != nil { r.errs = append(r.errs, err) } return r.events, r.errs } // PeriodicReportingFetchV2Error runs the given metricset and returns // the first batch of events or errors that occur during that period. // // `period` is the time between each fetch. // `timeout` is the maximum time to wait for the first event. // // The function tries to fetch the metrics every `period` until it gets // the first batch of metrics or the `timeout` is reached. func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() r := &CapturingReporterV2{} _ = timed.Periodic(ctx, period, func() error { // Fetch the metrics and store them in the // reporter. if err := metricSet.Fetch(r); err != nil { r.errs = append(r.errs, err) return err } if len(r.events) > 0 { // We have metrics, stop the periodic // and return the metrics. cancel() } // No metrics yet, retry again // in the next period. return nil }) return r.events, r.errs } // ReportingFetchV2WithContext runs the given reporting metricset and returns all of the // events and errors that occur during that period. func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) { r := &CapturingReporterV2{} err := metricSet.Fetch(context.Background(), r) if err != nil { r.errs = append(r.errs, err) } return r.events, r.errs } // NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // global Registry. func NewPushMetricSetV2(t testing.TB, config interface{}) mb.PushMetricSetV2 { metricSet := NewMetricSet(t, config) pushMetricSet, ok := metricSet.(mb.PushMetricSetV2) if !ok { t.Fatal("MetricSet does not implement PushMetricSetV2") } return pushMetricSet } // NewPushMetricSetV2WithRegistry instantiates a new PushMetricSetV2 using the given // configuration. The ModuleFactory and MetricSetFactory are obtained from the // passed in the registry. func NewPushMetricSetV2WithRegistry(t testing.TB, config interface{}, registry *mb.Register) mb.PushMetricSetV2 { metricSet := NewMetricSetWithRegistry(t, config, registry) pushMetricSet, ok := metricSet.(mb.PushMetricSetV2) if !ok { t.Fatal("MetricSet does not implement PushMetricSetV2") } return pushMetricSet } // NewPushMetricSetV2WithContext instantiates a new PushMetricSetV2WithContext // using the given configuration. The ModuleFactory and MetricSetFactory are // obtained from the global Registry. func NewPushMetricSetV2WithContext(t testing.TB, config interface{}) mb.PushMetricSetV2WithContext { metricSet := NewMetricSet(t, config) pushMetricSet, ok := metricSet.(mb.PushMetricSetV2WithContext) if !ok { t.Fatal("MetricSet does not implement PushMetricSetV2WithContext") } return pushMetricSet } // CapturingPushReporterV2 stores all the events and errors from a metricset's // Run method. type CapturingPushReporterV2 struct { context.Context eventsC chan mb.Event } func newCapturingPushReporterV2(ctx context.Context) *CapturingPushReporterV2 { return &CapturingPushReporterV2{Context: ctx, eventsC: make(chan mb.Event)} } // report writes an event to the output channel and returns true. If the output // is closed it returns false. func (r *CapturingPushReporterV2) report(event mb.Event) bool { select { case <-r.Done(): // Publisher is stopped. return false case r.eventsC <- event: return true } } // Event stores the passed-in event into the events array func (r *CapturingPushReporterV2) Event(event mb.Event) bool { return r.report(event) } // Error stores the given error into the errors array. func (r *CapturingPushReporterV2) Error(err error) bool { return r.report(mb.Event{Error: err}) } func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event { var events []mb.Event for { select { case <-r.Done(): // Timeout return events case e := <-r.eventsC: events = append(events, e) if waitEvents > 0 && len(events) >= waitEvents { return events } } } } // BlockingCapture blocks until waitEvents n of events are captured func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event { events := make([]mb.Event, 0, waitEvents) for e := range r.eventsC { events = append(events, e) if waitEvents > 0 && len(events) >= waitEvents { return events } } return events } // RunPushMetricSetV2 run the given push metricset for the specific amount of // time and returns all of the events and errors that occur during that period. func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2) []mb.Event { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() r := newCapturingPushReporterV2(ctx) go metricSet.Run(r) return r.capture(waitEvents) } // GetCapturingPushReporterV2 is a factory for a capturing push metricset func GetCapturingPushReporterV2() mb.PushReporterV2 { return newCapturingPushReporterV2(context.Background()) } // RunPushMetricSetV2WithContext run the given push metricset for the specific amount of // time and returns all of the events that occur during that period. func RunPushMetricSetV2WithContext(timeout time.Duration, waitEvents int, metricSet mb.PushMetricSetV2WithContext) []mb.Event { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() r := newCapturingPushReporterV2(ctx) go metricSet.Run(ctx, r) return r.capture(waitEvents) }