libbeat/otelbeat/oteltest/oteltest.go (99 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package oteltest provides test utilities for OpenTelemetry and Beats components. package oteltest import ( "context" "sync" "testing" "time" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest/observer" ) type ReceiverConfig struct { Name string Config component.Config Factory receiver.Factory } type CheckReceiversParams struct { T *testing.T // Receivers is a list of receiver configurations to create. Receivers []ReceiverConfig // AssertFunc is a function that asserts the test conditions. // The function is called periodically until the assertions are met or the timeout is reached. AssertFunc func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) } // CheckReceivers creates receivers using the provided configuration. func CheckReceivers(params CheckReceiversParams) { t := params.T ctx := t.Context() var logsMu sync.Mutex logs := make(map[string][]mapstr.M) zapCore := zapcore.NewCore( zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), &zaptest.Discarder{}, zapcore.DebugLevel, ) observed, zapLogs := observer.New(zapcore.DebugLevel) core := zapcore.NewTee(zapCore, observed) createReceiver := func(t *testing.T, rc ReceiverConfig) receiver.Logs { t.Helper() var receiverSettings receiver.Settings // Replicate the behavior of the collector logger receiverCore := core. With([]zapcore.Field{ zap.String("otelcol.component.id", rc.Name), zap.String("otelcol.component.kind", "receiver"), zap.String("otelcol.signal", "logs"), }) receiverSettings.Logger = zap.New(receiverCore) receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { for _, rl := range ld.ResourceLogs().All() { for _, sl := range rl.ScopeLogs().All() { for _, lr := range sl.LogRecords().All() { logsMu.Lock() logs[rc.Name] = append(logs[rc.Name], lr.Body().Map().AsRaw()) logsMu.Unlock() } } } return nil }) assert.NoErrorf(t, err, "Error creating log consumer for %q", rc.Name) r, err := rc.Factory.CreateLogs(ctx, receiverSettings, rc.Config, logConsumer) assert.NoErrorf(t, err, "Error creating receiver %q", rc.Name) return r } // Replicate the collector behavior to instantiate components first and then start them. var receivers []receiver.Logs for _, rec := range params.Receivers { receivers = append(receivers, createReceiver(t, rec)) } for i, r := range receivers { err := r.Start(ctx, nil) require.NoErrorf(t, err, "Error starting receiver %d", i) defer func() { require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i) }() } t.Cleanup(func() { if t.Failed() { logsMu.Lock() defer logsMu.Unlock() t.Logf("Ingested Logs: %v", logs) } }) require.EventuallyWithT(t, func(ct *assert.CollectT) { logsMu.Lock() defer logsMu.Unlock() // Ensure the logger fields from the otel collector are present in the logs. for _, zl := range zapLogs.All() { require.Contains(t, zl.ContextMap(), "otelcol.component.id") require.Equal(t, zl.ContextMap()["otelcol.component.kind"], "receiver") require.Equal(t, zl.ContextMap()["otelcol.signal"], "logs") break } params.AssertFunc(ct, logs, zapLogs) }, 2*time.Minute, 100*time.Millisecond, "timeout waiting for logger fields from the OTel collector are present in the logs and other assertions to be met") }