pkg/output/shipper/shipper.go (108 lines of code) (raw):
package shipper
import (
"context"
"fmt"
"time"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-shipper-client/pkg/helpers"
sc "github.com/elastic/elastic-agent-shipper-client/pkg/proto"
"github.com/elastic/elastic-agent-shipper-client/pkg/proto/messages"
"github.com/elastic/go-ucfg"
"github.com/elastic/spigot/pkg/output"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)
const Name = "shipper"
type ShipperOutput struct {
client sc.ProducerClient
conn *grpc.ClientConn
batch []*messages.Event
config config
}
func init() {
output.Register(Name, New)
}
// New is factory for creating a new Shipper output
func New(cfg *ucfg.Config) (s output.Output, err error) {
c := defaultConfig()
if err := cfg.Unpack(&c); err != nil {
return nil, err
}
opts := defaultDialOptions()
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, c.Address, opts...)
if err != nil {
return nil, fmt.Errorf("error dialing %s: %w", c.Address, err)
}
client := sc.NewProducerClient(conn)
so := &ShipperOutput{
client: client,
conn: conn,
config: c,
}
return so, nil
}
func (s *ShipperOutput) Write(b []byte) (n int, err error) {
source := &messages.Source{
InputId: s.config.InputId,
StreamId: s.config.StreamId,
}
datastream := &messages.DataStream{
Type: s.config.DataStreamType,
Dataset: s.config.DataStreamDataset,
Namespace: s.config.DataStreamNamespace,
}
meta := mapstr.M{
"input_id": s.config.InputId,
"stream_id": s.config.StreamId,
}
metaStruct, err := helpers.NewStruct(meta)
if err != nil {
return 0, err
}
fields := mapstr.M{
"message": string(b),
"data_stream": mapstr.M{
"type": s.config.DataStreamType,
"data_set": s.config.DataStreamDataset,
"namespace": s.config.DataStreamNamespace,
},
}
fieldsStruct, err := helpers.NewStruct(fields)
if err != nil {
return 0, err
}
e := &messages.Event{
Timestamp: timestamppb.New(time.Now()),
Source: source,
DataStream: datastream,
Metadata: metaStruct,
Fields: fieldsStruct,
}
s.batch = append(s.batch, e)
return len(b), nil
}
func (s *ShipperOutput) Close() error {
if len(s.batch) > 0 {
if err := s.send(); err != nil {
return err
}
}
return s.conn.Close()
}
func (s *ShipperOutput) NewInterval() error {
return s.send()
}
func (s *ShipperOutput) send() error {
if s.conn == nil {
return fmt.Errorf("connection is not established to: %s", s.config.Address)
}
ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout)
defer cancel()
_, err := s.client.PublishEvents(ctx, &messages.PublishRequest{Events: s.batch})
if err != nil {
return fmt.Errorf("publish events failed: %w", err)
}
s.batch = s.batch[0:0]
return nil
}