plugins/input/http/input_http.go (225 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"time"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/util"
)
// Response sends HTTP request and collects HTTP response metrics.
type Response struct {
Addresses []string
AddressPath string
FlushAddressIntervalSec int
Body string
Method string
ResponseTimeoutMs int
PerAddressSleepMs int
Headers map[string]string
FollowRedirects bool
ResponseStringMatch string
IncludeBody bool
// Path to CA file
SSLCA string
// Path to host cert file
SSLCert string
// Path to cert key file
SSLKey string
// Use SSL but skip chain & host verification
InsecureSkipVerify bool
compiledStringMatch *regexp.Regexp
client *http.Client
context pipeline.Context
tags map[string]string
lastLoadAddressTime time.Time
}
func (h *Response) loadAddresses() error {
if h.Addresses == nil {
h.Addresses = make([]string, 0, 1)
}
if len(h.AddressPath) > 0 {
addresses, err := util.ReadLines(h.AddressPath)
h.lastLoadAddressTime = time.Now()
if err != nil {
return err
}
if addresses == nil {
return fmt.Errorf("no url in this file: %s", h.AddressPath)
}
h.Addresses = addresses
}
if len(h.Addresses) == 0 {
h.Addresses = append(h.Addresses, "http://localhost")
}
for _, address := range h.Addresses {
addr, err := url.Parse(address)
if err != nil {
return err
}
if addr.Scheme != "http" && addr.Scheme != "https" {
return errors.New("Only http and https are supported")
}
}
return nil
}
func (h *Response) Init(context pipeline.Context) (int, error) {
h.context = context
// Set default values
if h.ResponseTimeoutMs < 1 {
h.ResponseTimeoutMs = 5000
}
if h.ResponseTimeoutMs > 30000 {
logger.Info(h.context.GetRuntimeContext(), "can't config ResponseTimeoutMs for more than 30s, reset value to 30000. your config is", h.ResponseTimeoutMs)
h.ResponseTimeoutMs = 30000
}
if h.FlushAddressIntervalSec < 1 {
h.FlushAddressIntervalSec = 60
}
if h.PerAddressSleepMs < 1 {
h.PerAddressSleepMs = 100
}
// Check send and expected string
if h.Method == "" {
h.Method = "GET"
}
err := h.loadAddresses()
if err != nil {
return 0, err
}
// Prepare data
h.tags = map[string]string{"_method_": h.Method}
if h.client == nil {
client, err := h.createHTTPClient()
if err != nil {
return 0, err
}
h.client = client
}
return 0, nil
}
// Description returns the plugin Description
func (h *Response) Description() string {
return "HTTP/HTTPS request given an address a method and a timeout"
}
// var sampleConfig = `
// ## Server address (default http://localhost)
// # address = "http://localhost"
// ## Set response_timeout (default 5 seconds)
// # response_timeout = "5s"
// ## HTTP Request Method
// # method = "GET"
// ## Whether to follow redirects from the server (defaults to false)
// # follow_redirects = false
// ## Optional HTTP Request Body
// # body = '''
// # {'fake':'data'}
// # '''
// ## Optional substring or regex match in body of the response
// # response_string_match = "\"service_status\": \"up\""
// # response_string_match = "ok"
// # response_string_match = "\".*_status\".?:.?\"up\""
// ## Optional SSL Config
// # ssl_ca = "/etc/telegraf/ca.pem"
// # ssl_cert = "/etc/telegraf/cert.pem"
// # ssl_key = "/etc/telegraf/key.pem"
// ## Use SSL but skip chain & host verification
// # insecure_skip_verify = false
// ## HTTP Request Headers (all values must be strings)
// # [inputs.http_response.headers]
// # Host = "github.com"
// `
// ErrRedirectAttempted indicates that a redirect occurred
var ErrRedirectAttempted = errors.New("redirect")
// createHTTPClient creates an http client which will timeout at the specified
// timeout period and can follow redirects if specified
func (h *Response) createHTTPClient() (*http.Client, error) {
tlsCfg, err := util.GetTLSConfig(
h.SSLCert, h.SSLKey, h.SSLCA, h.InsecureSkipVerify)
if tlsCfg != nil {
logger.Debug(h.context.GetRuntimeContext(), "init ssl cfg", tlsCfg)
}
if err != nil {
return nil, err
}
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsCfg,
},
Timeout: time.Duration(h.ResponseTimeoutMs) * time.Millisecond,
}
if !h.FollowRedirects {
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return ErrRedirectAttempted
}
}
return client, nil
}
// httpGather gathers all fields and returns any errors it encounters.
func (h *Response) httpGather(address string) (map[string]string, error) {
// Prepare fields
fields := map[string]string{"_address_": address}
var body io.Reader
if h.Body != "" {
body = strings.NewReader(h.Body)
}
request, err := http.NewRequest(h.Method, address, body)
if err != nil {
return nil, err
}
for key, val := range h.Headers {
if key == "__Host__" {
request.Host = val
} else {
request.Header.Add(key, val)
}
}
// Start Timer
start := time.Now()
resp, err := h.client.Do(request)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fields["_result_"] = "timeout"
return fields, nil
}
fields["_result_"] = "connection_failed"
if h.FollowRedirects {
return fields, nil
}
if urlError, ok := err.(*url.Error); !ok || urlError.Err != ErrRedirectAttempted {
return fields, nil
}
}
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
_ = resp.Body.Close()
}()
fields["_response_time_ms_"] = strconv.FormatFloat(float64(time.Since(start).Nanoseconds())/1000000., 'f', 3, 32)
fields["_http_response_code_"] = strconv.Itoa(resp.StatusCode)
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error(h.context.GetRuntimeContext(), "HTTP_PARSE_ALARM", "Read body of HTTP response failed", err)
fields["_result_"] = "invalid_body"
fields["_response_match_"] = "no"
return fields, nil
}
// Check the response for a regex match.
if h.ResponseStringMatch != "" {
// Compile once and reuse
if h.compiledStringMatch == nil {
h.compiledStringMatch, err = regexp.Compile(h.ResponseStringMatch)
if err != nil {
logger.Error(h.context.GetRuntimeContext(), "HTTP_INIT_ALARM", "Compile regular expression faild", h.ResponseStringMatch, "error", err)
fields["_result_"] = "match_regex_invalid"
return fields, nil
}
}
if h.compiledStringMatch.Match(bodyBytes) {
fields["_result_"] = "success"
fields["_response_match_"] = "yes"
} else {
fields["_result_"] = "mismatch"
fields["_response_match_"] = "no"
}
} else {
fields["_result_"] = "success"
}
if h.IncludeBody {
fields["content"] = string(bodyBytes)
}
return fields, nil
}
// Collect gets all metric fields and tags and returns any errors it encounters
func (h *Response) Collect(collector pipeline.Collector) error {
// should not occur
if h.tags == nil || h.client == nil {
return nil
}
if len(h.AddressPath) > 0 {
curTime := time.Now()
if curTime.Sub(h.lastLoadAddressTime).Seconds() > float64(h.FlushAddressIntervalSec) {
err := h.loadAddresses()
if err != nil {
logger.Warning(h.context.GetRuntimeContext(), "HTTP_LOAD_ADDRESS_ALARM", "load address error, file", h.AddressPath, "error", err)
}
}
}
logger.Debug(h.context.GetRuntimeContext(), "collect addresses", h.Addresses)
// Gather data
for _, address := range h.Addresses {
fields, err := h.httpGather(address)
if err != nil {
logger.Warning(h.context.GetRuntimeContext(), "HTTP_COLLECT_ALARM", "collect error, address", address, "error", err)
}
if len(fields) > 0 {
// Add metrics
collector.AddData(h.tags, fields)
}
}
return nil
}
func init() {
pipeline.MetricInputs["metric_http"] = func() pipeline.MetricInput {
return &Response{}
}
}