plugins/input/httpserver/input_http_server.go (299 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpserver
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"sync"
"syscall"
"time"
"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/pipeline/extensions"
"github.com/alibaba/ilogtail/pkg/protocol/decoder/common"
)
const (
v1 = iota
v2
)
const name = "service_http_server"
// ServiceHTTP ...
type ServiceHTTP struct {
context pipeline.Context
collector pipeline.Collector
decoder extensions.Decoder
server *http.Server
listener net.Listener
wg sync.WaitGroup
collectorV2 pipeline.PipelineCollector
version int8
paramCount int
dumper *helper.Dumper
DumpDataKeepFiles int
DumpData bool // would dump the received data to a local file, which is only used to valid data by the developers.
Decoder string // the decoder to use, default is "ext_default_decoder"
Format string
Address string
Path string
ReadTimeoutSec int
ShutdownTimeoutSec int
MaxBodySize int64
UnlinkUnixSock bool
FieldsExtend bool
DisableUncompress bool
AllowUnsafeMode bool
Tags map[string]string // todo for v2
// params below works only for version v2
QueryParams []string
HeaderParams []string
QueryParamPrefix string
HeaderParamPrefix string
}
// Init ...
func (s *ServiceHTTP) Init(context pipeline.Context) (int, error) {
s.context = context
var err error
options := &struct {
Format string
FieldsExtend bool
DisableUncompress bool
AllowUnsafeMode bool
}{
Format: s.Format,
FieldsExtend: s.FieldsExtend,
DisableUncompress: s.DisableUncompress,
AllowUnsafeMode: s.AllowUnsafeMode,
}
ext, err := context.GetExtension(s.Decoder, options)
if err != nil {
return 0, err
}
decoder, ok := ext.(extensions.Decoder)
if !ok {
return 0, fmt.Errorf("extension %s with type %T not implement extensions.Decoder", s.Decoder, ext)
}
s.decoder = decoder
if s.Path == "" {
switch s.Format {
case common.ProtocolOTLPLogV1:
s.Path = "/v1/logs"
case common.ProtocolOTLPMetricV1:
s.Path = "/v1/metrics"
case common.ProtocolOTLPTraceV1:
s.Path = "/v1/traces"
case common.ProtocolPyroscope:
s.Path = "/ingest"
}
}
s.Address += s.Path
logger.Infof(context.GetRuntimeContext(), "addr", s.Address, "format", s.Format)
s.paramCount = len(s.QueryParams) + len(s.HeaderParams)
if s.DumpData {
s.dumper = helper.NewDumper(strings.Join([]string{name, context.GetProject(), context.GetConfigName()}, "-"), s.DumpDataKeepFiles)
s.dumper.Init()
}
return 0, nil
}
// Description ...
func (s *ServiceHTTP) Description() string {
return "HTTP service input plugin for logtail"
}
// Collect takes in an accumulator and adds the metrics that the Input
// gathers. This is called every "interval"
func (s *ServiceHTTP) Collect(pipeline.Collector) error {
return nil
}
func (s *ServiceHTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.ContentLength > s.MaxBodySize {
TooLarge(w)
return
}
data, statusCode, err := s.decoder.ParseRequest(w, r, s.MaxBodySize)
logger.Debugf(s.context.GetRuntimeContext(), "request [method] %v; [header] %v; [url] %v; [body len] %d", r.Method, r.Header, r.URL, len(data))
switch statusCode {
case http.StatusBadRequest:
BadRequest(w)
case http.StatusRequestEntityTooLarge:
TooLarge(w)
case http.StatusInternalServerError:
InternalServerError(w)
case http.StatusMethodNotAllowed:
MethodNotAllowed(w)
}
if err != nil {
logger.Warning(s.context.GetRuntimeContext(), "READ_BODY_FAIL_ALARM", "read body failed", err, "request", r.URL.String())
return
}
if s.dumper != nil {
s.dumper.InputChannel() <- &helper.DumpData{
Req: helper.DumpDataReq{
Body: data,
URL: r.URL.String(),
Header: r.Header,
},
}
}
switch s.version {
case v1:
logs, err := s.decoder.Decode(data, r, s.Tags)
if err != nil {
logger.Warning(s.context.GetRuntimeContext(), "DECODE_BODY_FAIL_ALARM", "decode body failed", err, "request", r.URL.String())
BadRequest(w)
return
}
for _, log := range logs {
s.collector.AddRawLog(log)
}
case v2:
groups, err := s.decoder.DecodeV2(data, r)
if err != nil {
logger.Warning(s.context.GetRuntimeContext(), "DECODE_BODY_FAIL_ALARM", "decode body failed", err, "request", r.URL.String())
BadRequest(w)
return
}
if reqParams := s.extractRequestParams(r); len(reqParams) != 0 {
for _, g := range groups {
g.Group.Metadata.Merge(models.NewMetadataWithMap(reqParams))
}
}
s.collectorV2.CollectList(groups...)
}
switch s.Format {
case common.ProtocolSLS:
w.Header().Set("x-log-requestid", "1234567890abcde")
w.WriteHeader(http.StatusOK)
case common.ProtocolPyroscope, common.ProtocolPrometheus:
// do nothing
default:
w.WriteHeader(http.StatusNoContent)
}
}
func TooLarge(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusRequestEntityTooLarge)
_, _ = res.Write([]byte(`{"error":"http: request body too large"}`))
}
func MethodNotAllowed(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusMethodNotAllowed)
_, _ = res.Write([]byte(`{"error":"http: method not allowed"}`))
}
func InternalServerError(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusInternalServerError)
}
func BadRequest(res http.ResponseWriter) {
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusBadRequest)
_, _ = res.Write([]byte(`{"error":"http: bad request"}`))
}
// Start starts the ServiceInput's service, whatever that may be
func (s *ServiceHTTP) Start(c pipeline.Collector) error {
s.collector = c
s.version = v1
return s.start()
}
// StartService start the ServiceInput's service by plugin runner v2
func (s *ServiceHTTP) StartService(context pipeline.PipelineContext) error {
s.collectorV2 = context.Collector()
s.version = v2
return s.start()
}
func (s *ServiceHTTP) start() error {
s.wg.Add(1)
server := &http.Server{
Addr: s.Address,
Handler: s,
ReadTimeout: time.Duration(s.ReadTimeoutSec) * time.Second,
}
var listener net.Listener
var err error
switch {
case strings.HasPrefix(s.Address, "unix"):
sockPath := strings.Replace(s.Address, "unix://", "", 1)
if s.UnlinkUnixSock {
_ = syscall.Unlink(sockPath)
}
listener, err = net.Listen("unix", sockPath)
case strings.HasPrefix(s.Address, "http") ||
strings.HasPrefix(s.Address, "https") ||
strings.HasPrefix(s.Address, "tcp"):
configURL, errAddr := url.Parse(s.Address)
if errAddr != nil {
return errAddr
}
listener, err = net.Listen("tcp", configURL.Host)
default:
listener, err = net.Listen("tcp", s.Address)
}
if err != nil {
return err
}
s.listener = listener
s.server = server
go func() {
logger.Info(s.context.GetRuntimeContext(), "http server start", s.Address, "listener", listener.Addr().String())
err := server.Serve(listener)
if err != nil {
logger.Error(s.context.GetRuntimeContext(), "INIT_SERVER_ARMAR", "err", err.Error())
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.ShutdownTimeoutSec)*time.Second)
defer cancel()
_ = server.Shutdown(ctx)
logger.Info(s.context.GetRuntimeContext(), "http server shutdown", s.Address)
s.wg.Done()
}()
if s.dumper != nil {
s.dumper.Start()
}
return nil
}
func (s *ServiceHTTP) extractRequestParams(req *http.Request) map[string]string {
keyValues := make(map[string]string, s.paramCount)
for _, key := range s.QueryParams {
value := req.FormValue(key)
if len(value) == 0 {
continue
}
if len(s.QueryParamPrefix) > 0 {
builder := strings.Builder{}
builder.WriteString(s.QueryParamPrefix)
builder.WriteString(key)
key = builder.String()
}
keyValues[key] = value
}
for _, key := range s.HeaderParams {
value := req.Header.Get(key)
if len(value) == 0 {
continue
}
if len(s.HeaderParamPrefix) > 0 {
builder := strings.Builder{}
builder.WriteString(s.HeaderParamPrefix)
builder.WriteString(key)
key = builder.String()
}
keyValues[key] = value
}
return keyValues
}
// Stop stops the services and closes any necessary channels and connections
func (s *ServiceHTTP) Stop() error {
if s.listener != nil {
_ = s.listener.Close()
logger.Info(s.context.GetRuntimeContext(), "http server stop", s.Address)
s.wg.Wait()
}
if s.dumper != nil {
s.dumper.Close()
}
return nil
}
func init() {
pipeline.ServiceInputs[name] = func() pipeline.ServiceInput {
return &ServiceHTTP{
Decoder: "ext_default_decoder",
ReadTimeoutSec: 10,
ShutdownTimeoutSec: 5,
MaxBodySize: 64 * 1024 * 1024,
UnlinkUnixSock: true,
DumpDataKeepFiles: 5,
Tags: map[string]string{},
}
}
}