internal/output/udp/udp.go (45 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more agreements. // Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. package tcp import ( "context" "net" "golang.org/x/time/rate" "github.com/elastic/stream/internal/output" ) const burst = 1024 * 1024 func init() { output.Register("udp", New) } type Output struct { opts *output.Options conn *net.UDPConn ctx context.Context limit *rate.Limiter } func New(opts *output.Options) (output.Output, error) { return &Output{ opts: opts, limit: rate.NewLimiter(rate.Limit(opts.RateLimit), burst), }, nil } func (o *Output) DialContext(ctx context.Context) error { udpAddr, err := net.ResolveUDPAddr("udp", o.opts.Addr) if err != nil { return err } conn, err := net.DialUDP("udp", nil, udpAddr) if err != nil { return err } o.conn = conn o.ctx = ctx return nil } func (o *Output) Close() error { return o.conn.Close() } func (o *Output) Write(b []byte) (int, error) { if err := o.limit.WaitN(o.ctx, len(b)); err != nil { return 0, err } return o.conn.Write(b) }