internal/output/webhook/webhook.go (99 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package webhook
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"github.com/elastic/stream/internal/output"
)
func init() {
output.Register("webhook", New)
}
type Output struct {
opts *output.Options
client *http.Client
}
func New(opts *output.Options) (output.Output, error) {
if _, err := url.Parse(opts.Addr); err != nil {
return nil, fmt.Errorf("address must be a valid URL for webhook output: %w", err)
}
if opts.Timeout < 0 {
return nil, fmt.Errorf("timeout must not be negative: %v", opts.Timeout)
}
client := &http.Client{
Timeout: opts.Timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.InsecureTLS,
},
},
}
return &Output{opts: opts, client: client}, nil
}
func (o *Output) DialContext(ctx context.Context) error {
// Use a HEAD request to check if the service is ready.
req, err := http.NewRequestWithContext(ctx, http.MethodHead, o.opts.Addr, nil)
if err != nil {
return err
}
if o.opts.Username != "" && o.opts.Password != "" {
req.SetBasicAuth(o.opts.Username, o.opts.Password)
}
if err = setHeaders(req, o.opts.Headers); err != nil {
return err
}
resp, err := o.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
// Don't check the status code in case the endpoint does not support HEAD.
return nil
}
func (o *Output) Close() error {
o.client.CloseIdleConnections()
return nil
}
func (o *Output) Write(b []byte) (int, error) {
req, err := http.NewRequest(http.MethodPost, o.opts.Addr, bytes.NewReader(b))
if err != nil {
return 0, err
}
if o.opts.ContentType != "" {
req.Header.Set("Content-Type", o.opts.ContentType)
}
if o.opts.Username != "" && o.opts.Password != "" {
req.SetBasicAuth(o.opts.Username, o.opts.Password)
}
if err = setHeaders(req, o.opts.Headers); err != nil {
return 0, err
}
resp, err := o.client.Do(req)
if err != nil {
return 0, err
}
var buf bytes.Buffer
io.Copy(&buf, resp.Body)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if buf.Len() == 0 {
buf.WriteString("no body")
}
return 0, fmt.Errorf("http post to webhook failed with http status %v %v: %s", resp.StatusCode, resp.Status, &buf)
}
return len(b), nil
}
func setHeaders(req *http.Request, headers []string) error {
for _, h := range headers {
parts := strings.SplitN(h, "=", 2)
switch len(parts) {
case 2:
req.Header.Set(parts[0], parts[1])
default:
return fmt.Errorf("failed to parse header %q", h)
}
}
return nil
}