func()

in pkg/output/shipper/shipper.go [54:94]


func (s *ShipperOutput) Write(b []byte) (n int, err error) {
	source := &messages.Source{
		InputId:  s.config.InputId,
		StreamId: s.config.StreamId,
	}
	datastream := &messages.DataStream{
		Type:      s.config.DataStreamType,
		Dataset:   s.config.DataStreamDataset,
		Namespace: s.config.DataStreamNamespace,
	}
	meta := mapstr.M{
		"input_id":  s.config.InputId,
		"stream_id": s.config.StreamId,
	}

	metaStruct, err := helpers.NewStruct(meta)
	if err != nil {
		return 0, err
	}
	fields := mapstr.M{
		"message": string(b),
		"data_stream": mapstr.M{
			"type":      s.config.DataStreamType,
			"data_set":  s.config.DataStreamDataset,
			"namespace": s.config.DataStreamNamespace,
		},
	}
	fieldsStruct, err := helpers.NewStruct(fields)
	if err != nil {
		return 0, err
	}
	e := &messages.Event{
		Timestamp:  timestamppb.New(time.Now()),
		Source:     source,
		DataStream: datastream,
		Metadata:   metaStruct,
		Fields:     fieldsStruct,
	}
	s.batch = append(s.batch, e)
	return len(b), nil
}