internal/output/tcp/tcp.go (56 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more agreements. // Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. package tcp import ( "context" "errors" "io" "net" "time" "github.com/elastic/stream/internal/output" ) func init() { output.Register("tcp", New) } type Output struct { opts *output.Options conn *net.TCPConn } func New(opts *output.Options) (output.Output, error) { return &Output{opts: opts}, nil } func (o *Output) DialContext(ctx context.Context) error { d := net.Dialer{Timeout: time.Second} conn, err := d.DialContext(ctx, "tcp", o.opts.Addr) if err != nil { return err } o.conn = conn.(*net.TCPConn) return nil } func (o *Output) Conn() net.Conn { return o.conn } func (o *Output) Close() error { if o.conn != nil { if err := o.conn.CloseWrite(); err != nil { return err } // drain to facilitate graceful close on the other side deadline := time.Now().Add(5 * time.Second) if err := o.conn.SetReadDeadline(deadline); err != nil { return err } buffer := make([]byte, 1024) for { _, err := o.conn.Read(buffer) if errors.Is(err, io.EOF) { break } else if err != nil { return err } } return o.conn.Close() } return nil } func (o *Output) Write(b []byte) (int, error) { return o.conn.Write(append(b, '\n')) }