plugins/flusher/http/flusher_http.go (552 lines of code) (raw):
// Copyright 2022 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package http
import (
"bytes"
"compress/gzip"
"crypto/rand"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"sync"
"time"
"github.com/golang/snappy"
"github.com/alibaba/ilogtail/pkg/fmtstr"
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/pipeline/extensions"
"github.com/alibaba/ilogtail/pkg/protocol"
converter "github.com/alibaba/ilogtail/pkg/protocol/converter"
)
const (
defaultTimeout = time.Minute
contentTypeHeader = "Content-Type"
defaultContentType = "application/octet-stream"
contentEncodingHeader = "Content-Encoding"
)
var contentTypeMaps = map[string]string{
converter.EncodingJSON: "application/json",
converter.EncodingProtobuf: defaultContentType,
converter.EncodingNone: defaultContentType,
converter.EncodingCustom: defaultContentType,
}
var supportedCompressionType = map[string]any{
"gzip": nil,
"snappy": nil,
}
type retryConfig struct {
Enable bool // If enable retry, default is true
MaxRetryTimes int // Max retry times, default is 3
InitialDelay time.Duration // Delay time before the first retry, default is 1s
MaxDelay time.Duration // max delay time when retry, default is 30s
}
type Client interface {
Do(req *http.Request) (*http.Response, error)
}
type FlusherHTTP struct {
RemoteURL string // RemoteURL to request
Headers map[string]string // Headers to append to the http request
Query map[string]string // Query parameters to append to the http request
Timeout time.Duration // Request timeout, default is 60s
Retry retryConfig // Retry strategy, default is retry 3 times with delay time begin from 1second, max to 30 seconds
Encoder *extensions.ExtensionConfig // Encoder defines which protocol and format to encode to
Convert helper.ConvertConfig // Convert defines which protocol and format to convert to
Concurrency int // How many requests can be performed in concurrent
MaxConnsPerHost int // MaxConnsPerHost for http.Transport
MaxIdleConnsPerHost int // MaxIdleConnsPerHost for http.Transport
IdleConnTimeout time.Duration // IdleConnTimeout for http.Transport
WriteBufferSize int // WriteBufferSize for http.Transport
Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use
FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use
AsyncIntercept bool // intercept the event asynchronously
RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings
QueueCapacity int // capacity of channel
DropEventWhenQueueFull bool // If true, pipeline events will be dropped when the queue is full
Compression string // Compression type, support gzip and snappy at this moment.
varKeys []string
context pipeline.Context
encoder extensions.Encoder
converter *converter.Converter
client Client
interceptor extensions.FlushInterceptor
queue chan interface{}
counter sync.WaitGroup
}
func NewHTTPFlusher() *FlusherHTTP {
return &FlusherHTTP{
QueueCapacity: 1024,
Timeout: defaultTimeout,
Concurrency: 1,
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolCustomSingle,
Encoding: converter.EncodingJSON,
IgnoreUnExpectedData: true,
},
Retry: retryConfig{
Enable: true,
MaxRetryTimes: 3,
InitialDelay: time.Second,
MaxDelay: 30 * time.Second,
},
}
}
func (f *FlusherHTTP) Description() string {
return "http flusher for ilogtail"
}
func (f *FlusherHTTP) Init(context pipeline.Context) error {
f.context = context
logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initializing")
if f.RemoteURL == "" {
err := errors.New("remoteURL is empty")
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init fail, error", err)
return err
}
if f.Concurrency < 1 {
err := errors.New("concurrency must be greater than zero")
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher check concurrency fail, error", err)
return err
}
var err error
if err = f.initEncoder(); err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init encoder fail, error", err)
return err
}
if err = f.initConverter(); err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init converter fail, error", err)
return err
}
if f.FlushInterceptor != nil {
var ext pipeline.Extension
ext, err = f.context.GetExtension(f.FlushInterceptor.Type, f.FlushInterceptor.Options)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init filter fail, error", err)
return err
}
interceptor, ok := ext.(extensions.FlushInterceptor)
if !ok {
err = fmt.Errorf("filter(%s) not implement interface extensions.FlushInterceptor", f.FlushInterceptor)
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init filter fail, error", err)
return err
}
f.interceptor = interceptor
}
err = f.initHTTPClient()
if err != nil {
return err
}
if f.QueueCapacity <= 0 {
f.QueueCapacity = 1024
}
f.queue = make(chan interface{}, f.QueueCapacity)
for i := 0; i < f.Concurrency; i++ {
go f.runFlushTask()
}
f.buildVarKeys()
f.fillRequestContentType()
logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized",
"timeout", f.Timeout,
"compression", f.Compression)
return nil
}
func (f *FlusherHTTP) Flush(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error {
for _, logGroup := range logGroupList {
f.addTask(logGroup)
}
return nil
}
func (f *FlusherHTTP) Export(groupEventsArray []*models.PipelineGroupEvents, ctx pipeline.PipelineContext) error {
for _, groupEvents := range groupEventsArray {
if !f.AsyncIntercept && f.interceptor != nil {
groupEvents = f.interceptor.Intercept(groupEvents)
// skip groupEvents that is nil or empty.
if groupEvents == nil || len(groupEvents.Events) == 0 {
continue
}
}
f.addTask(groupEvents)
}
return nil
}
func (f *FlusherHTTP) SetUrgent(flag bool) {
}
func (f *FlusherHTTP) IsReady(projectName string, logstoreName string, logstoreKey int64) bool {
return f.client != nil
}
func (f *FlusherHTTP) Stop() error {
f.counter.Wait()
close(f.queue)
return nil
}
func (f *FlusherHTTP) SetHTTPClient(client Client) {
f.client = client
}
func (f *FlusherHTTP) initEncoder() error {
if f.Encoder == nil {
return nil
}
ext, err := f.context.GetExtension(f.Encoder.Type, f.Encoder.Options)
if err != nil {
return fmt.Errorf("get extension failed, error: %w", err)
}
enc, ok := ext.(extensions.Encoder)
if !ok {
return fmt.Errorf("filter(%s) not implement interface extensions.Encoder", f.Encoder)
}
f.encoder = enc
return nil
}
func (f *FlusherHTTP) initConverter() error {
conv, err := f.getConverter()
if err == nil {
f.converter = conv
return nil
}
if f.encoder == nil {
// e.g.
// Prometheus http flusher does not config helper.ConvertConfig,
// but must config encoder config (i.e. prometheus encoder config).
// If err != nil, meanwhile http flusher has no encoder,
// flusher cannot work, so should return error.
return err
}
return nil
}
func (f *FlusherHTTP) getConverter() (*converter.Converter, error) {
return converter.NewConverterWithSep(f.Convert.Protocol, f.Convert.Encoding, f.Convert.Separator, f.Convert.IgnoreUnExpectedData, f.Convert.TagFieldsRename, f.Convert.ProtocolFieldsRename, f.context.GetPipelineScopeConfig())
}
func (f *FlusherHTTP) initHTTPClient() error {
transport := http.DefaultTransport
if dt, ok := transport.(*http.Transport); ok {
dt = dt.Clone()
if f.Concurrency > dt.MaxIdleConnsPerHost {
dt.MaxIdleConnsPerHost = f.Concurrency + 1
}
if f.MaxConnsPerHost > dt.MaxConnsPerHost {
dt.MaxConnsPerHost = f.MaxConnsPerHost
}
if f.MaxIdleConnsPerHost > dt.MaxIdleConnsPerHost {
dt.MaxIdleConnsPerHost = f.MaxIdleConnsPerHost
}
if f.IdleConnTimeout > dt.IdleConnTimeout {
dt.IdleConnTimeout = f.IdleConnTimeout
}
if f.WriteBufferSize > 0 {
dt.WriteBufferSize = f.WriteBufferSize
}
transport = dt
}
var err error
transport, err = f.initRequestInterceptors(transport)
if err != nil {
return err
}
if f.Authenticator != nil {
var auth pipeline.Extension
auth, err = f.context.GetExtension(f.Authenticator.Type, f.Authenticator.Options)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init authenticator fail, error", err)
return err
}
ca, ok := auth.(extensions.ClientAuthenticator)
if !ok {
err = fmt.Errorf("authenticator(%s) not implement interface extensions.ClientAuthenticator", f.Authenticator)
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init authenticator fail, error", err)
return err
}
transport, err = ca.RoundTripper(transport)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init authenticator fail, error", err)
return err
}
}
f.client = &http.Client{
Timeout: f.Timeout,
Transport: transport,
}
return nil
}
func (f *FlusherHTTP) initRequestInterceptors(transport http.RoundTripper) (http.RoundTripper, error) {
for i := len(f.RequestInterceptors) - 1; i >= 0; i-- {
setting := f.RequestInterceptors[i]
ext, err := f.context.GetExtension(setting.Type, setting.Options)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init request interceptor fail, error", err)
return nil, err
}
interceptor, ok := ext.(extensions.RequestInterceptor)
if !ok {
err = fmt.Errorf("interceptor(%s) with type %T not implement interface extensions.RequestInterceptor", setting.Type, ext)
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init request interceptor fail, error", err)
return nil, err
}
transport, err = interceptor.RoundTripper(transport)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init request interceptor fail, error", err)
return nil, err
}
}
return transport, nil
}
func (f *FlusherHTTP) addTask(log interface{}) {
f.counter.Add(1)
if f.DropEventWhenQueueFull {
select {
case f.queue <- log:
default:
f.counter.Done()
logger.Warningf(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher dropped a group event since the queue is full")
}
} else {
f.queue <- log
}
}
func (f *FlusherHTTP) countDownTask() {
f.counter.Done()
}
func (f *FlusherHTTP) runFlushTask() {
flushTaskFn, action := f.convertAndFlush, "convert"
if f.encoder != nil {
flushTaskFn, action = f.encodeAndFlush, "encode"
}
for data := range f.queue {
err := flushTaskFn(data)
if err != nil {
logger.Errorf(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM",
"http flusher failed %s or flush data, data dropped, error: %s", action, err.Error())
}
}
}
func (f *FlusherHTTP) encodeAndFlush(event any) error {
defer f.countDownTask()
var data [][]byte
var err error
switch v := event.(type) {
case *models.PipelineGroupEvents:
data, err = f.encoder.EncodeV2(v)
default:
return errors.New("unsupported event type")
}
if err != nil {
return fmt.Errorf("http flusher encode event data fail, error: %w", err)
}
for _, shard := range data {
if err = f.flushWithRetry(shard, nil); err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM",
"http flusher failed flush data after retry, data dropped, error", err,
"remote url", f.RemoteURL)
}
}
return nil
}
func (f *FlusherHTTP) convertAndFlush(data interface{}) error {
defer f.countDownTask()
var logs interface{}
var varValues []map[string]string
var err error
switch v := data.(type) {
case *protocol.LogGroup:
logs, varValues, err = f.converter.ToByteStreamWithSelectedFields(v, f.varKeys)
case *models.PipelineGroupEvents:
if f.AsyncIntercept && f.interceptor != nil {
v = f.interceptor.Intercept(v)
if v == nil || len(v.Events) == 0 {
return nil
}
}
logs, varValues, err = f.converter.ToByteStreamWithSelectedFieldsV2(v, f.varKeys)
default:
return fmt.Errorf("unsupport data type")
}
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher converter log fail, error", err)
return err
}
switch rows := logs.(type) {
case [][]byte:
for idx, data := range rows {
body, values := data, varValues[idx]
err = f.flushWithRetry(body, values)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher failed flush data after retry, data dropped, error", err)
}
}
return nil
case []byte:
err = f.flushWithRetry(rows, nil)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher failed flush data after retry, error", err)
}
return err
default:
err = fmt.Errorf("not supported logs type [%T]", logs)
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher failed flush data, error", err)
return err
}
}
func (f *FlusherHTTP) flushWithRetry(data []byte, varValues map[string]string) error {
var err error
for i := 0; i <= f.Retry.MaxRetryTimes; i++ {
ok, retryable, e := f.flush(data, varValues)
if ok || !retryable || !f.Retry.Enable {
err = e
break
}
err = e
<-time.After(f.getNextRetryDelay(i))
}
converter.PutPooledByteBuf(&data)
return err
}
func (f *FlusherHTTP) getNextRetryDelay(retryTime int) time.Duration {
delay := f.Retry.InitialDelay * 1 << time.Duration(retryTime)
if delay > f.Retry.MaxDelay {
delay = f.Retry.MaxDelay
}
// apply about equaly distributed jitter in second half of the interval, such that the wait
// time falls into the interval [dur/2, dur]
harf := int64(delay / 2)
jitter, err := rand.Int(rand.Reader, big.NewInt(harf+1))
if err != nil {
return delay
}
return time.Duration(harf + jitter.Int64())
}
func (f *FlusherHTTP) compressData(data []byte) (io.Reader, error) {
var reader io.Reader = bytes.NewReader(data)
if compressionType, ok := f.Headers[contentEncodingHeader]; ok {
switch compressionType {
case "gzip":
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
if _, err := gw.Write(data); err != nil {
return nil, err
}
if err := gw.Close(); err != nil {
return nil, err
}
reader = &buf
case "snappy":
compressedData := snappy.Encode(nil, data)
reader = bytes.NewReader(compressedData)
default:
}
}
return reader, nil
}
func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retryable bool, err error) {
reader, err := f.compressData(data)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "create reader error", err)
return false, false, err
}
req, err := http.NewRequest(http.MethodPost, f.RemoteURL, reader)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher create request fail, error", err)
return false, false, err
}
if len(f.Query) > 0 {
values := req.URL.Query()
for k, v := range f.Query {
if len(f.varKeys) == 0 {
values.Add(k, v)
continue
}
fv, ferr := fmtstr.FormatTopic(varValues, v)
if ferr != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher format query fail, error", ferr)
} else {
v = *fv
}
values.Add(k, v)
}
req.URL.RawQuery = values.Encode()
}
for k, v := range f.Headers {
if len(f.varKeys) == 0 {
req.Header.Add(k, v)
continue
}
fv, ferr := fmtstr.FormatTopic(varValues, v)
if ferr != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher format header fail, error", ferr)
} else {
v = *fv
}
req.Header.Add(k, v)
}
response, err := f.client.Do(req)
if logger.DebugFlag() {
logger.Debugf(f.context.GetRuntimeContext(), "request [method]: %v; [header]: %v; [url]: %v; [body]: %v", req.Method, req.Header, req.URL, string(data))
}
if err != nil {
urlErr, ok := err.(*url.Error)
retry := false
if ok && (urlErr.Timeout() || urlErr.Temporary()) {
retry = true
}
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALRAM", "http flusher send request fail, error", err)
return false, retry, err
}
body, err := io.ReadAll(response.Body)
if err != nil {
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALRAM", "http flusher read response fail, error", err)
return false, false, err
}
err = response.Body.Close()
if err != nil {
logger.Warning(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher close response body fail, error", err)
return false, false, err
}
switch response.StatusCode / 100 {
case 2:
return true, false, nil
case 5:
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher write data returned error, url", req.URL.String(), "status", response.Status, "body", string(body))
return false, true, fmt.Errorf("err status returned: %v", response.Status)
default:
if response.StatusCode == http.StatusUnauthorized || response.StatusCode == http.StatusForbidden {
return false, true, fmt.Errorf("err status returned: %v", response.Status)
}
logger.Error(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher write data returned error, url", req.URL.String(), "status", response.Status, "body", string(body))
return false, false, fmt.Errorf("unexpected status returned: %v", response.Status)
}
}
func (f *FlusherHTTP) buildVarKeys() {
cache := map[string]struct{}{}
defines := []map[string]string{f.Query, f.Headers}
for _, define := range defines {
for _, v := range define {
keys, err := fmtstr.CompileKeys(v)
if err != nil {
logger.Warning(f.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "http flusher init varKeys fail, err", err)
}
for _, key := range keys {
cache[key] = struct{}{}
}
}
}
varKeys := make([]string, 0, len(cache))
for k := range cache {
varKeys = append(varKeys, k)
}
f.varKeys = varKeys
}
func (f *FlusherHTTP) fillRequestContentType() {
if f.Headers == nil {
f.Headers = make(map[string]string, 4)
}
if f.Compression != "" {
if _, ok := supportedCompressionType[f.Compression]; ok {
f.Headers[contentEncodingHeader] = f.Compression
}
}
_, ok := f.Headers[contentTypeHeader]
if ok {
return
}
contentType, ok := contentTypeMaps[f.Convert.Encoding]
if !ok {
contentType = defaultContentType
}
f.Headers[contentTypeHeader] = contentType
}
func init() {
pipeline.Flushers["flusher_http"] = func() pipeline.Flusher {
return NewHTTPFlusher()
}
}