packetbeat/protos/http/http.go (757 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package http import ( "bytes" "encoding/base64" "errors" "fmt" "net" "net/url" "strconv" "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/ecs" "github.com/elastic/beats/v7/packetbeat/pb" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/beats/v7/packetbeat/protos" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" ) var ( debugf = logp.MakeDebug("http") detailedf = logp.MakeDebug("httpdetailed") ) type parserState uint8 const ( stateStart parserState = iota stateHeaders stateBody stateBodyChunkedStart stateBodyChunked stateBodyChunkedWaitFinalCRLF ) var ( unmatchedResponses = monitoring.NewInt(nil, "http.unmatched_responses") unmatchedRequests = monitoring.NewInt(nil, "http.unmatched_requests") ) type stream struct { tcptuple *common.TCPTuple data []byte parseOffset int parseState parserState bodyReceived int message *message } type httpConnectionData struct { streams [2]*stream requests messageList responses messageList } type messageList struct { head, tail *message } // HTTP application level protocol analyser plugin. type httpPlugin struct { // config ports []int sendRequest bool sendResponse bool splitCookie bool hideKeywords []string redactAuthorization bool redactHeaders []string maxMessageSize int mustDecodeBody bool parserConfig parserConfig transactionTimeout time.Duration results protos.Reporter watcher *procs.ProcessesWatcher } var ( isDebug = false isDetailed = false ) func init() { protos.Register("http", New) } func New( testMode bool, results protos.Reporter, watcher *procs.ProcessesWatcher, cfg *conf.C, ) (protos.Plugin, error) { p := &httpPlugin{} config := defaultConfig if !testMode { if err := cfg.Unpack(&config); err != nil { return nil, err } } if err := p.init(results, watcher, &config); err != nil { return nil, err } return p, nil } // Init initializes the HTTP protocol analyser. func (http *httpPlugin) init(results protos.Reporter, watcher *procs.ProcessesWatcher, config *httpConfig) error { http.setFromConfig(config) isDebug = logp.IsDebug("http") isDetailed = logp.IsDebug("httpdetailed") http.results = results http.watcher = watcher return nil } func (http *httpPlugin) setFromConfig(config *httpConfig) { http.ports = config.Ports http.sendRequest = config.SendRequest http.sendResponse = config.SendResponse http.hideKeywords = config.HideKeywords http.redactAuthorization = config.RedactAuthorization http.splitCookie = config.SplitCookie http.parserConfig.realIPHeader = strings.ToLower(config.RealIPHeader) http.transactionTimeout = config.TransactionTimeout http.mustDecodeBody = config.DecodeBody http.redactHeaders = make([]string, len(config.RedactHeaders)) for i, header := range config.RedactHeaders { http.redactHeaders[i] = strings.ToLower(header) } for _, list := range [][]string{config.IncludeBodyFor, config.IncludeRequestBodyFor} { http.parserConfig.includeRequestBodyFor = append(http.parserConfig.includeRequestBodyFor, list...) } for _, list := range [][]string{config.IncludeBodyFor, config.IncludeResponseBodyFor} { http.parserConfig.includeResponseBodyFor = append(http.parserConfig.includeResponseBodyFor, list...) } http.maxMessageSize = config.MaxMessageSize if config.SendAllHeaders { http.parserConfig.sendHeaders = true http.parserConfig.sendAllHeaders = true } else { if len(config.SendHeaders) > 0 { http.parserConfig.sendHeaders = true http.parserConfig.headersWhitelist = map[string]bool{} for _, hdr := range config.SendHeaders { http.parserConfig.headersWhitelist[strings.ToLower(hdr)] = true } } } } // GetPorts lists the port numbers the HTTP protocol analyser will handle. func (http *httpPlugin) GetPorts() []int { return http.ports } // messageGap is called when a gap of size `nbytes` is found in the // tcp stream. Decides if we can ignore the gap or it's a parser error // and we need to drop the stream. func (http *httpPlugin) messageGap(s *stream, nbytes int) (ok bool, complete bool) { m := s.message switch s.parseState { case stateStart, stateHeaders: // we know we cannot recover from these return false, false case stateBody: if isDebug { debugf("gap in body: %d", nbytes) } if m.isRequest { if !m.packetLossReq { m.packetLossReq = true m.notes = append(m.notes, "Packet loss while capturing the request") } } else { if !m.packetLossResp { m.packetLossResp = true m.notes = append(m.notes, "Packet loss while capturing the response") } } if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { s.bodyReceived += nbytes m.contentLength += nbytes return true, false } else if len(s.data)+nbytes >= m.contentLength-s.bodyReceived { // we're done, but the last portion of the data is gone return true, true } else { s.bodyReceived += nbytes return true, false } } // assume we cannot recover return false, false } func (st *stream) PrepareForNewMessage() { st.parseState = stateStart st.parseOffset = 0 st.bodyReceived = 0 st.message = nil } // Called when the parser has identified the boundary // of a message. func (http *httpPlugin) messageComplete( conn *httpConnectionData, tcptuple *common.TCPTuple, dir uint8, st *stream, ) { http.handleHTTP(conn, st.message, tcptuple, dir) } // ConnectionTimeout returns the configured HTTP transaction timeout. func (http *httpPlugin) ConnectionTimeout() time.Duration { return http.transactionTimeout } // Parse function is used to process TCP payloads. func (http *httpPlugin) Parse( pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { conn := ensureHTTPConnection(private) conn = http.doParse(conn, pkt, tcptuple, dir) if conn == nil { return nil } return conn } func ensureHTTPConnection(private protos.ProtocolData) *httpConnectionData { conn := getHTTPConnection(private) if conn == nil { conn = &httpConnectionData{} } return conn } func getHTTPConnection(private protos.ProtocolData) *httpConnectionData { if private == nil { return nil } priv, ok := private.(*httpConnectionData) if !ok { logp.Warn("http connection data type error") return nil } if priv == nil { logp.Warn("Unexpected: http connection data not set") return nil } return priv } // Parse function is used to process TCP payloads. func (http *httpPlugin) doParse( conn *httpConnectionData, pkt *protos.Packet, tcptuple *common.TCPTuple, dir uint8, ) *httpConnectionData { if isDetailed { detailedf("Payload received: [%s]", pkt.Payload) } extraMsgSize := 0 // size of a "seen" packet for which we don't store the actual bytes st := conn.streams[dir] if st == nil { st = newStream(pkt, tcptuple) conn.streams[dir] = st } else { // concatenate bytes totalLength := len(st.data) + len(pkt.Payload) msg := st.message if msg != nil { totalLength += len(msg.body) } if totalLength > http.maxMessageSize { if isDebug { debugf("Stream data too large, ignoring message") } extraMsgSize = len(pkt.Payload) } else { st.data = append(st.data, pkt.Payload...) } } for len(st.data) > 0 || extraMsgSize > 0 { if st.message == nil { st.message = &message{ts: pkt.Ts} } parser := newParser(&http.parserConfig) ok, complete := parser.parse(st, extraMsgSize) extraMsgSize = 0 if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it conn.streams[dir] = nil return conn } if !complete { // wait for more data break } // all ok, ship it http.messageComplete(conn, tcptuple, dir, st) // and reset stream for next message st.PrepareForNewMessage() } return conn } func newStream(pkt *protos.Packet, tcptuple *common.TCPTuple) *stream { return &stream{ tcptuple: tcptuple, data: pkt.Payload, message: &message{ts: pkt.Ts}, } } // ReceivedFin will be called when TCP transaction is terminating. func (http *httpPlugin) ReceivedFin(tcptuple *common.TCPTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData { debugf("Received FIN") conn := getHTTPConnection(private) if conn == nil { return private } stream := conn.streams[dir] if stream == nil { return conn } // send whatever data we got so far as complete. This // is needed for the HTTP/1.0 without Content-Length situation. if stream.message != nil { http.handleHTTP(conn, stream.message, tcptuple, dir) // and reset message. Probably not needed, just to be sure. stream.PrepareForNewMessage() } return conn } // GapInStream is called when a gap of nbytes bytes is found in the stream (due // to packet loss). func (http *httpPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool, ) { conn := getHTTPConnection(private) if conn == nil { return private, false } stream := conn.streams[dir] if stream == nil || stream.message == nil { // nothing to do return private, false } ok, complete := http.messageGap(stream, nbytes) if isDetailed { detailedf("messageGap returned ok=%v complete=%v", ok, complete) } if !ok { // on errors, drop stream conn.streams[dir] = nil return conn, true } if complete { // Current message is complete, we need to publish from here http.messageComplete(conn, tcptuple, dir, stream) } // don't drop the stream, we can ignore the gap return private, false } func (http *httpPlugin) handleHTTP( conn *httpConnectionData, m *message, tcptuple *common.TCPTuple, dir uint8, ) { m.tcpTuple = *tcptuple m.direction = dir m.cmdlineTuple = http.watcher.FindProcessesTupleTCP(tcptuple.IPPort()) if !http.redactAuthorization { m.username = extractBasicAuthUser(m.headers) } http.hideHeaders(m) if m.isRequest { if isDebug { debugf("Received request with tuple: %s", m.tcpTuple) } conn.requests.append(m) } else { if isDebug { debugf("Received response with tuple: %s", m.tcpTuple) } conn.responses.append(m) http.correlate(conn) } } func (http *httpPlugin) flushResponses(conn *httpConnectionData) { for !conn.responses.empty() { unmatchedResponses.Add(1) resp := conn.responses.pop() debugf("Response from unknown transaction: %s. Reporting error.", resp.tcpTuple) if resp.statusCode == 100 { debugf("Drop first 100-continue response") return } event := http.newTransaction(nil, resp) http.publishTransaction(event) } } func (http *httpPlugin) flushRequests(conn *httpConnectionData) { for !conn.requests.empty() { unmatchedRequests.Add(1) requ := conn.requests.pop() debugf("Request from unknown transaction %s. Reporting error.", requ.tcpTuple) event := http.newTransaction(requ, nil) http.publishTransaction(event) } } func (http *httpPlugin) correlate(conn *httpConnectionData) { // drop responses with missing requests if conn.requests.empty() { http.flushResponses(conn) return } // merge requests with responses into transactions for !conn.responses.empty() && !conn.requests.empty() { requ := conn.requests.pop() resp := conn.responses.pop() event := http.newTransaction(requ, resp) if isDebug { debugf("HTTP transaction completed") } http.publishTransaction(event) } } func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event { status := common.OK_STATUS if resp == nil { status = common.ERROR_STATUS if requ != nil { requ.notes = append(requ.notes, "Unmatched request") } } else if resp.statusCode >= 400 { status = common.ERROR_STATUS } if requ == nil { status = common.ERROR_STATUS if resp != nil { resp.notes = append(resp.notes, "Unmatched response") } } var ts time.Time var src, dst *common.Endpoint for _, m := range []*message{requ, resp} { if m == nil { continue } ts = m.ts src, dst = m.getEndpoints() break } evt, pbf := pb.NewBeatEvent(ts) pbf.SetSource(src) pbf.SetDestination(dst) pbf.AddIP(src.IP) pbf.AddIP(dst.IP) pbf.Network.Transport = "tcp" pbf.Network.Protocol = "http" fields := evt.Fields fields["type"] = pbf.Network.Protocol fields["status"] = status var httpFields ProtocolFields if requ != nil { http.decodeBody(requ) path, params, err := http.extractParameters(requ) if err != nil { logp.Warn("Fail to parse HTTP parameters: %v", err) } pbf.Source.Bytes = int64(requ.size) host, port := extractHostHeader(string(requ.host)) if net.ParseIP(host) == nil { pbf.Destination.Domain = host pbf.AddHost(host) } else { pbf.AddIP(host) } if port == 0 { port = int(pbf.Destination.Port) } else if port != int(pbf.Destination.Port) { requ.notes = append(requ.notes, "Host header port number mismatch") } pbf.Event.Start = requ.ts pbf.Network.ForwardedIP = string(requ.realIP) pbf.AddIP(string(requ.realIP)) pbf.Error.Message = requ.notes // http httpFields.Version = requ.version.String() httpFields.RequestBytes = int64(requ.size) httpFields.RequestBodyBytes = int64(requ.contentLength) httpFields.RequestMethod = requ.method httpFields.RequestReferrer = requ.referer pbf.AddHost(string(requ.referer)) if requ.sendBody && len(requ.body) > 0 { httpFields.RequestBodyBytes = int64(len(requ.body)) httpFields.RequestBodyContent = common.NetString(requ.body) } httpFields.RequestHeaders = http.collectHeaders(requ) // url u := newURL(host, int64(port), path, params) pb.MarshalStruct(evt.Fields, "url", u) // user-agent userAgent := ecs.UserAgent{Original: string(requ.userAgent)} pb.MarshalStruct(evt.Fields, "user_agent", userAgent) // packetbeat root fields if http.sendRequest { fields["request"] = string(http.makeRawMessage(requ)) } fields["method"] = httpFields.RequestMethod fields["query"] = fmt.Sprintf("%s %s", requ.method, path) if requ.username != "" { fields["user.name"] = requ.username pbf.AddUser(requ.username) } } if resp != nil { http.decodeBody(resp) pbf.Destination.Bytes = int64(resp.size) pbf.Event.End = resp.ts pbf.Error.Message = append(pbf.Error.Message, resp.notes...) // http httpFields.ResponseStatusCode = int64(resp.statusCode) httpFields.ResponseStatusPhrase = bytes.ToLower(resp.statusPhrase) httpFields.ResponseBytes = int64(resp.size) httpFields.ResponseBodyBytes = int64(resp.contentLength) if resp.sendBody && len(resp.body) > 0 { httpFields.ResponseBodyBytes = int64(len(resp.body)) httpFields.ResponseBodyContent = common.NetString(resp.body) } httpFields.ResponseHeaders = http.collectHeaders(resp) // packetbeat root fields if http.sendResponse { fields["response"] = string(http.makeRawMessage(resp)) } } pb.MarshalStruct(evt.Fields, "http", httpFields) return evt } func (http *httpPlugin) makeRawMessage(m *message) string { if m.sendBody { var b strings.Builder b.Grow(len(m.rawHeaders) + len(m.body)) b.Write(m.rawHeaders) b.Write(m.body) return b.String() } return string(m.rawHeaders) } func (http *httpPlugin) publishTransaction(event beat.Event) { if http.results == nil { return } http.results(event) } func (http *httpPlugin) collectHeaders(m *message) mapstr.M { hdrs := map[string]interface{}{} hdrs["content-length"] = m.contentLength if len(m.contentType) > 0 { hdrs["content-type"] = m.contentType } if http.parserConfig.sendHeaders { cookie := "cookie" if !m.isRequest { cookie = "set-cookie" } for name, value := range m.headers { switch { case bytes.Equal([]byte(name), nameContentLength), bytes.Equal([]byte(name), nameContentType): continue } if http.splitCookie && name == cookie { hdrs[name] = splitCookiesHeader(string(value)) } else { hdrs[name] = value } } } return hdrs } func (http *httpPlugin) decodeBody(m *message) { if m.saveBody && len(m.body) > 0 { if http.mustDecodeBody && len(m.encodings) > 0 { var err error m.body, err = decodeBody(m.body, m.encodings, http.maxMessageSize) if err != nil { // Body can contain partial data m.notes = append(m.notes, err.Error()) } } } } func decodeBody(body []byte, encodings []string, maxSize int) (result []byte, err error) { if isDebug { debugf("decoding body with encodings=%v", encodings) } for idx := len(encodings) - 1; idx >= 0; idx-- { format := encodings[idx] body, err = decodeHTTPBody(body, format, maxSize) if err != nil { // Do not output a partial body unless failure occurs on the // last decoder. if idx != 0 { body = nil } return body, fmt.Errorf("unable to decode body using %s encoding: %w", format, err) } } return body, nil } func splitCookiesHeader(headerVal string) map[string]string { cookies := map[string]string{} cstring := strings.Split(headerVal, ";") for _, cval := range cstring { cookie := strings.SplitN(cval, "=", 2) if len(cookie) == 2 { cookies[strings.ToLower(strings.TrimSpace(cookie[0]))] = parseCookieValue(strings.TrimSpace(cookie[1])) } } return cookies } func parseCookieValue(raw string) string { // Strip the quotes, if present. if len(raw) > 1 && raw[0] == '"' && raw[len(raw)-1] == '"' { raw = raw[1 : len(raw)-1] } return raw } func extractHostHeader(header string) (host string, port int) { if header == "" || net.ParseIP(header) != nil { return header, port } host, ps, err := net.SplitHostPort(header) if err != nil { var addrError *net.AddrError if errors.As(err, &addrError) && addrError.Err == "missing port in address" { return trimSquareBracket(header), port } } pi, err := strconv.ParseInt(ps, 10, 16) if err != nil || pi == 0 { return header, port } return trimSquareBracket(host), int(pi) } func trimSquareBracket(s string) string { s, ok := strings.CutPrefix(s, "[") if !ok { return s } return strings.TrimSuffix(s, "]") } func (http *httpPlugin) hideHeaders(m *message) { for _, header := range http.redactHeaders { if _, exists := m.headers[header]; exists { m.headers[header] = []byte("REDACTED") } } if !m.isRequest || !http.redactAuthorization { return } msg := m.rawHeaders limit := len(msg) // byte64 != encryption, so obscure it in headers in case of Basic Authentication redactHeaders := []string{"authorization", "proxy-authorization"} authText := []byte("uthorization:") // [aA] case insensitive, also catches Proxy-Authorization: authHeaderStartX := m.headerOffset authHeaderEndX := limit for authHeaderStartX < limit { if isDebug { debugf("looking for authorization from %d to %d", authHeaderStartX, authHeaderEndX) } startOfHeader := bytes.Index(msg[authHeaderStartX:], authText) if startOfHeader >= 0 { authHeaderStartX = authHeaderStartX + startOfHeader endOfHeader := bytes.Index(msg[authHeaderStartX:], constCRLF) if endOfHeader >= 0 { authHeaderEndX = authHeaderStartX + endOfHeader if authHeaderEndX > limit { authHeaderEndX = limit } if isDebug { debugf("Redact authorization from %d to %d", authHeaderStartX, authHeaderEndX) } for i := authHeaderStartX + len(authText); i < authHeaderEndX; i++ { msg[i] = byte('*') } } } authHeaderStartX = authHeaderEndX + len(constCRLF) authHeaderEndX = len(m.rawHeaders) } for _, header := range redactHeaders { if len(m.headers[header]) > 0 { m.headers[header] = []byte("*") } } } func (http *httpPlugin) hideSecrets(values url.Values) url.Values { params := url.Values{} for key, array := range values { for _, value := range array { if http.isSecretParameter(key) { params.Add(key, "xxxxx") } else { params.Add(key, value) } } } return params } // extractParameters parses the URL and the form parameters and replaces the secrets // with the string xxxxx. The parameters containing secrets are defined in http.Hide_secrets. // Returns the Request URI path and the (adjusted) parameters. func (http *httpPlugin) extractParameters(m *message) (path string, params string, err error) { var values url.Values u, err := url.Parse(string(m.requestURI)) if err != nil { return } values = u.Query() path = u.Path paramsMap := http.hideSecrets(values) if m.contentLength > 0 && m.saveBody && bytes.Contains(m.contentType, []byte("urlencoded")) { values, err = url.ParseQuery(string(m.body)) if err != nil { return } for key, value := range http.hideSecrets(values) { paramsMap[key] = value } } params = paramsMap.Encode() if isDetailed { detailedf("Form parameters: %s", params) } return } func (http *httpPlugin) isSecretParameter(key string) bool { for _, keyword := range http.hideKeywords { if strings.ToLower(key) == keyword { return true } } return false } func (http *httpPlugin) Expired(tuple *common.TCPTuple, private protos.ProtocolData) { conn := getHTTPConnection(private) if conn == nil { return } if isDebug { debugf("expired connection %s", tuple) } // terminate streams for dir, s := range conn.streams { // Do not send incomplete or empty messages if s != nil && s.message != nil && s.message.headersReceived() { if isDebug { debugf("got message %+v", s.message) } http.handleHTTP(conn, s.message, tuple, uint8(dir)) s.PrepareForNewMessage() } } // correlate transactions http.correlate(conn) // flush uncorrelated requests and responses http.flushRequests(conn) http.flushResponses(conn) } func (ml *messageList) append(msg *message) { if ml.tail == nil { ml.head = msg } else { ml.tail.next = msg } msg.next = nil ml.tail = msg } func (ml *messageList) empty() bool { return ml.head == nil } func (ml *messageList) pop() *message { if ml.head == nil { return nil } msg := ml.head ml.head = ml.head.next if ml.head == nil { ml.tail = nil } return msg } func extractBasicAuthUser(headers map[string]common.NetString) string { const prefix = "Basic " auth := string(headers["authorization"]) if len(auth) < len(prefix) || !strings.EqualFold(auth[:len(prefix)], prefix) { return "" } c, err := base64.StdEncoding.DecodeString(auth[len(prefix):]) if err != nil { c, err = base64.RawStdEncoding.DecodeString(auth[len(prefix):]) if err != nil { return "" } } cs := string(c) s := strings.IndexByte(cs, ':') if s < 0 { return "" } return cs[:s] }