plugins/outputs/graylog/graylog.go (388 lines of code) (raw):
package graylog
import (
"bytes"
"compress/zlib"
"crypto/rand"
"crypto/tls"
"encoding/binary"
ejson "encoding/json"
"fmt"
"io"
"math"
"net"
"os"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/outputs"
)
const (
defaultEndpoint = "127.0.0.1:12201"
defaultConnection = "wan"
defaultMaxChunkSizeWan = 1420
defaultMaxChunkSizeLan = 8154
defaultScheme = "udp"
defaultTimeout = 5 * time.Second
)
var defaultSpecFields = []string{"version", "host", "short_message", "full_message", "timestamp", "level", "facility", "line", "file"}
type gelfConfig struct {
Endpoint string
Connection string
MaxChunkSizeWan int
MaxChunkSizeLan int
}
type gelf interface {
io.WriteCloser
Connect() error
}
type gelfCommon struct {
gelfConfig
dialer *net.Dialer
conn net.Conn
}
type gelfUDP struct {
gelfCommon
}
type gelfTCP struct {
gelfCommon
tlsConfig *tls.Config
}
func newGelfWriter(cfg gelfConfig, dialer *net.Dialer, tlsConfig *tls.Config) gelf {
if cfg.Endpoint == "" {
cfg.Endpoint = defaultEndpoint
}
if cfg.Connection == "" {
cfg.Connection = defaultConnection
}
if cfg.MaxChunkSizeWan == 0 {
cfg.MaxChunkSizeWan = defaultMaxChunkSizeWan
}
if cfg.MaxChunkSizeLan == 0 {
cfg.MaxChunkSizeLan = defaultMaxChunkSizeLan
}
scheme := defaultScheme
parts := strings.SplitN(cfg.Endpoint, "://", 2)
if len(parts) == 2 {
scheme = strings.ToLower(parts[0])
cfg.Endpoint = parts[1]
}
common := gelfCommon{
gelfConfig: cfg,
dialer: dialer,
}
var g gelf
switch scheme {
case "tcp":
g = &gelfTCP{gelfCommon: common, tlsConfig: tlsConfig}
default:
g = &gelfUDP{gelfCommon: common}
}
return g
}
func (g *gelfUDP) Write(message []byte) (n int, err error) {
compressed, err := g.compress(message)
if err != nil {
return 0, err
}
chunksize := g.gelfConfig.MaxChunkSizeWan
length := compressed.Len()
if length > chunksize {
chunkCountInt := int(math.Ceil(float64(length) / float64(chunksize)))
id := make([]byte, 8)
_, err = rand.Read(id)
if err != nil {
return 0, err
}
for i, index := 0, 0; i < length; i, index = i+chunksize, index+1 {
packet, err := g.createChunkedMessage(index, chunkCountInt, id, &compressed)
if err != nil {
return 0, err
}
err = g.send(packet.Bytes())
if err != nil {
return 0, err
}
}
} else {
err = g.send(compressed.Bytes())
if err != nil {
return 0, err
}
}
n = len(message)
return n, nil
}
func (g *gelfUDP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfUDP) createChunkedMessage(index int, chunkCountInt int, id []byte, compressed *bytes.Buffer) (bytes.Buffer, error) {
var packet bytes.Buffer
chunksize := g.getChunksize()
b, err := g.intToBytes(30)
if err != nil {
return packet, err
}
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
b, err = g.intToBytes(15)
if err != nil {
return packet, err
}
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
packet.Write(id) //nolint:revive // from buffer.go: "err is always nil"
b, err = g.intToBytes(index)
if err != nil {
return packet, err
}
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
b, err = g.intToBytes(chunkCountInt)
if err != nil {
return packet, err
}
packet.Write(b) //nolint:revive // from buffer.go: "err is always nil"
packet.Write(compressed.Next(chunksize)) //nolint:revive // from buffer.go: "err is always nil"
return packet, nil
}
func (g *gelfUDP) getChunksize() int {
if g.gelfConfig.Connection == "wan" {
return g.gelfConfig.MaxChunkSizeWan
}
if g.gelfConfig.Connection == "lan" {
return g.gelfConfig.MaxChunkSizeLan
}
return g.gelfConfig.MaxChunkSizeWan
}
func (g *gelfUDP) intToBytes(i int) ([]byte, error) {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, int8(i))
if err != nil {
return nil, err
}
return buf.Bytes(), err
}
func (g *gelfUDP) compress(b []byte) (bytes.Buffer, error) {
var buf bytes.Buffer
comp := zlib.NewWriter(&buf)
if _, err := comp.Write(b); err != nil {
return bytes.Buffer{}, err
}
if err := comp.Close(); err != nil {
return bytes.Buffer{}, err
}
return buf, nil
}
func (g *gelfUDP) Connect() error {
conn, err := g.dialer.Dial("udp", g.gelfConfig.Endpoint)
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfUDP) send(b []byte) error {
if g.conn == nil {
err := g.Connect()
if err != nil {
return err
}
}
_, err := g.conn.Write(b)
if err != nil {
_ = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) Write(message []byte) (n int, err error) {
err = g.send(message)
if err != nil {
return 0, err
}
n = len(message)
return n, nil
}
func (g *gelfTCP) Close() (err error) {
if g.conn != nil {
err = g.conn.Close()
g.conn = nil
}
return err
}
func (g *gelfTCP) Connect() error {
var err error
var conn net.Conn
if g.tlsConfig == nil {
conn, err = g.dialer.Dial("tcp", g.gelfConfig.Endpoint)
} else {
conn, err = tls.DialWithDialer(g.dialer, "tcp", g.gelfConfig.Endpoint, g.tlsConfig)
}
if err != nil {
return err
}
g.conn = conn
return nil
}
func (g *gelfTCP) send(b []byte) error {
if g.conn == nil {
err := g.Connect()
if err != nil {
return err
}
}
_, err := g.conn.Write(b)
if err != nil {
_ = g.conn.Close()
g.conn = nil
} else {
_, err = g.conn.Write([]byte{0}) // message delimiter
if err != nil {
_ = g.conn.Close()
g.conn = nil
}
}
return err
}
type Graylog struct {
Servers []string `toml:"servers"`
ShortMessageField string `toml:"short_message_field"`
NameFieldNoPrefix bool `toml:"name_field_noprefix"`
Timeout config.Duration `toml:"timeout"`
tlsint.ClientConfig
writer io.Writer
closers []io.WriteCloser
}
var sampleConfig = `
## Endpoints for your graylog instances.
servers = ["udp://127.0.0.1:12201"]
## Connection timeout.
# timeout = "5s"
## The field to use as the GELF short_message, if unset the static string
## "telegraf" will be used.
## example: short_message_field = "message"
# short_message_field = ""
## According to GELF payload specification, additional fields names must be prefixed
## with an underscore. Previous versions did not prefix custom field 'name' with underscore.
## Set to true for backward compatibility.
# name_field_no_prefix = false
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
`
func (g *Graylog) Connect() error {
var writers []io.Writer
dialer := &net.Dialer{Timeout: time.Duration(g.Timeout)}
if len(g.Servers) == 0 {
g.Servers = append(g.Servers, "localhost:12201")
}
tlsCfg, err := g.ClientConfig.TLSConfig()
if err != nil {
return err
}
for _, server := range g.Servers {
w := newGelfWriter(gelfConfig{Endpoint: server}, dialer, tlsCfg)
err := w.Connect()
if err != nil {
return fmt.Errorf("failed to connect to server [%s]: %v", server, err)
}
writers = append(writers, w)
g.closers = append(g.closers, w)
}
g.writer = io.MultiWriter(writers...)
return nil
}
func (g *Graylog) Close() error {
for _, closer := range g.closers {
_ = closer.Close()
}
return nil
}
func (g *Graylog) SampleConfig() string {
return sampleConfig
}
func (g *Graylog) Description() string {
return "Send telegraf metrics to graylog"
}
func (g *Graylog) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
values, err := g.serialize(metric)
if err != nil {
return err
}
for _, value := range values {
_, err := g.writer.Write([]byte(value))
if err != nil {
return fmt.Errorf("error writing message: %q, %v", value, err)
}
}
}
return nil
}
func (g *Graylog) serialize(metric telegraf.Metric) ([]string, error) {
out := []string{}
m := make(map[string]interface{})
m["version"] = "1.1"
m["timestamp"] = float64(metric.Time().UnixNano()) / 1_000_000_000
m["short_message"] = "telegraf"
if g.NameFieldNoPrefix {
m["name"] = metric.Name()
} else {
m["_name"] = metric.Name()
}
if host, ok := metric.GetTag("host"); ok {
m["host"] = host
} else {
host, err := os.Hostname()
if err != nil {
return []string{}, err
}
m["host"] = host
}
for _, tag := range metric.TagList() {
if tag.Key == "host" {
continue
}
if fieldInSpec(tag.Key) {
m[tag.Key] = tag.Value
} else {
m["_"+tag.Key] = tag.Value
}
}
for _, field := range metric.FieldList() {
if field.Key == g.ShortMessageField {
m["short_message"] = field.Value
} else if fieldInSpec(field.Key) {
m[field.Key] = field.Value
} else {
m["_"+field.Key] = field.Value
}
}
serialized, err := ejson.Marshal(m)
if err != nil {
return []string{}, err
}
out = append(out, string(serialized))
return out, nil
}
func fieldInSpec(field string) bool {
for _, specField := range defaultSpecFields {
if specField == field {
return true
}
}
return false
}
func init() {
outputs.Add("graylog", func() telegraf.Output {
return &Graylog{
Timeout: config.Duration(defaultTimeout),
}
})
}