in pkg/output/shipper/shipper.go [54:94]
func (s *ShipperOutput) Write(b []byte) (n int, err error) {
source := &messages.Source{
InputId: s.config.InputId,
StreamId: s.config.StreamId,
}
datastream := &messages.DataStream{
Type: s.config.DataStreamType,
Dataset: s.config.DataStreamDataset,
Namespace: s.config.DataStreamNamespace,
}
meta := mapstr.M{
"input_id": s.config.InputId,
"stream_id": s.config.StreamId,
}
metaStruct, err := helpers.NewStruct(meta)
if err != nil {
return 0, err
}
fields := mapstr.M{
"message": string(b),
"data_stream": mapstr.M{
"type": s.config.DataStreamType,
"data_set": s.config.DataStreamDataset,
"namespace": s.config.DataStreamNamespace,
},
}
fieldsStruct, err := helpers.NewStruct(fields)
if err != nil {
return 0, err
}
e := &messages.Event{
Timestamp: timestamppb.New(time.Now()),
Source: source,
DataStream: datastream,
Metadata: metaStruct,
Fields: fieldsStruct,
}
s.batch = append(s.batch, e)
return len(b), nil
}