x-pack/heartbeat/scenarios/framework/mocks.go (72 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package framework
import (
"fmt"
"sync"
"github.com/elastic/beats/v7/libbeat/beat"
)
type mockClient struct {
publishLog []*beat.Event
pipeline beat.Pipeline
closed bool
mtx sync.Mutex
clientConfig beat.ClientConfig
}
func (c *mockClient) IsClosed() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.closed
}
func (c *mockClient) Publish(e beat.Event) {
if c.clientConfig.Processing.Processor != nil {
outE, _ := c.clientConfig.Processing.Processor.Run(&e)
e = *outE
}
c.PublishAll([]beat.Event{e})
}
func (c *mockClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()
for _, e := range events {
eLocal := e
c.publishLog = append(c.publishLog, &eLocal)
}
}
func (c *mockClient) Wait() {
}
func (c *mockClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.closed {
return fmt.Errorf("mock client already closed")
}
c.closed = true
return nil
}
func (c *mockClient) PublishedEvents() []*beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.publishLog
}
type mockPipeline struct {
Clients []*mockClient
mtx sync.Mutex
}
func (pc *mockPipeline) Connect() (beat.Client, error) {
return pc.ConnectWith(beat.ClientConfig{})
}
func (pc *mockPipeline) ConnectWith(cc beat.ClientConfig) (beat.Client, error) {
pc.mtx.Lock()
defer pc.mtx.Unlock()
c := &mockClient{pipeline: pc, clientConfig: cc}
pc.Clients = append(pc.Clients, c)
return c, nil
}
func (pc *mockPipeline) PublishedEvents() []*beat.Event {
pc.mtx.Lock()
defer pc.mtx.Unlock()
var events []*beat.Event
for _, c := range pc.Clients {
events = append(events, c.PublishedEvents()...)
}
return events
}