packetbeat/protos/http/http_parser.go (549 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package http import ( "bytes" "errors" "fmt" "strconv" "time" "unicode" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/streambuf" "github.com/elastic/beats/v7/packetbeat/protos/tcp" "github.com/elastic/elastic-agent-libs/logp" ) // Http Message type message struct { ts time.Time hasContentLength bool headerOffset int version version connection common.NetString chunkedLength int isRequest bool tcpTuple common.TCPTuple cmdlineTuple *common.ProcessTuple direction uint8 // Request Info requestURI common.NetString method common.NetString statusCode uint16 statusPhrase common.NetString realIP common.NetString // Http Headers contentLength int contentType common.NetString host common.NetString referer common.NetString userAgent common.NetString encodings []string isChunked bool headers map[string]common.NetString size uint64 username string rawHeaders []byte // sendBody determines if the body must be sent along with the event // because the content-type is included in the send_body_for setting. sendBody bool // saveBody determines if the body must be saved. It is set when sendBody // is true or when the body type is form-urlencoded. saveBody bool body []byte notes []string packetLossReq bool packetLossResp bool next *message } type version struct { major uint8 minor uint8 } func (v version) String() string { if v.major == 1 && v.minor == 1 { return "1.1" } return fmt.Sprintf("%d.%d", v.major, v.minor) } type parser struct { config *parserConfig } type parserConfig struct { realIPHeader string sendHeaders bool sendAllHeaders bool headersWhitelist map[string]bool includeRequestBodyFor []string includeResponseBodyFor []string } var ( transferEncodingChunked = "chunked" constCRLF = []byte("\r\n") constClose = []byte("close") constKeepAlive = []byte("keep-alive") constHTTPVersion = []byte("HTTP/") nameContentLength = []byte("content-length") nameContentType = []byte("content-type") nameTransferEncoding = []byte("transfer-encoding") nameContentEncoding = []byte("content-encoding") nameConnection = []byte("connection") nameHost = []byte("host") nameReferer = []byte("referer") nameUserAgent = []byte("user-agent") ) func newParser(config *parserConfig) *parser { return &parser{config: config} } func (parser *parser) parse(s *stream, extraMsgSize int) (bool, bool) { m := s.message if extraMsgSize > 0 { // A packet of extraMsgSize size was seen, but we don't have // its actual bytes. This is only usable in the `stateBody` state. if s.parseState != stateBody { return false, false } return parser.eatBody(s, m, extraMsgSize) } for s.parseOffset < len(s.data) { switch s.parseState { case stateStart: if cont, ok, complete := parser.parseHTTPLine(s, m); !cont { return ok, complete } case stateHeaders: if cont, ok, complete := parser.parseHeaders(s, m); !cont { return ok, complete } case stateBody: return parser.parseBody(s, m) case stateBodyChunkedStart: if cont, ok, complete := parser.parseBodyChunkedStart(s, m); !cont { return ok, complete } case stateBodyChunked: if cont, ok, complete := parser.parseBodyChunked(s, m); !cont { return ok, complete } case stateBodyChunkedWaitFinalCRLF: return parser.parseBodyChunkedWaitFinalCRLF(s, m) } } return true, false } func (*parser) parseHTTPLine(s *stream, m *message) (cont, ok, complete bool) { i := bytes.Index(s.data[s.parseOffset:], []byte("\r\n")) if i == -1 { return false, true, false } // Very basic tests on the first line. Just to check that // we have what looks as an HTTP message var version []byte var err error fline := s.data[s.parseOffset:i] if len(fline) < 9 { if isDebug { debugf("First line too small") } return false, false, false } if bytes.Equal(fline[0:5], constHTTPVersion) { // RESPONSE m.isRequest = false version = fline[5:8] m.statusCode, m.statusPhrase, err = parseResponseStatus(fline[9:]) if err != nil { logp.Warn("Failed to understand HTTP response status: %s", fline[9:]) return false, false, false } if isDebug { debugf("HTTP status_code=%d, status_phrase=%s", m.statusCode, m.statusPhrase) } } else { // REQUEST afterMethodIdx := bytes.IndexFunc(fline, unicode.IsSpace) afterRequestURIIdx := bytes.LastIndexFunc(fline, unicode.IsSpace) // Make sure we have the VERB + URI + HTTP_VERSION if afterMethodIdx == -1 || afterRequestURIIdx == -1 || afterMethodIdx == afterRequestURIIdx { if isDebug { debugf("Couldn't understand HTTP request: %s", fline) } return false, false, false } m.method = common.NetString(fline[:afterMethodIdx]) m.requestURI = common.NetString(fline[afterMethodIdx+1 : afterRequestURIIdx]) versionIdx := afterRequestURIIdx + len(constHTTPVersion) + 1 if len(fline) > versionIdx && bytes.Equal(fline[afterRequestURIIdx+1:versionIdx], constHTTPVersion) { m.isRequest = true version = fline[versionIdx:] } else { if isDebug { debugf("Couldn't understand HTTP version: %s", fline) } return false, false, false } } m.version.major, m.version.minor, err = parseVersion(version) if err != nil { if isDebug { debugf("Failed to understand HTTP version: %v", version) } m.version.major = 1 m.version.minor = 0 } if isDebug { debugf("HTTP version %d.%d", m.version.major, m.version.minor) } // ok so far s.parseOffset = i + 2 m.headerOffset = s.parseOffset s.parseState = stateHeaders return true, true, true } func parseResponseStatus(s []byte) (uint16, []byte, error) { if isDebug { debugf("parseResponseStatus: %s", s) } var phrase []byte p := bytes.IndexByte(s, ' ') if p == -1 { p = len(s) } else { phrase = s[p+1:] } statusCode, err := parseInt(s[0:p]) if err != nil { return 0, nil, fmt.Errorf("Unable to parse status code from [%s]", s) } return uint16(statusCode), phrase, nil } func parseVersion(s []byte) (uint8, uint8, error) { if len(s) < 3 { return 0, 0, errors.New("Invalid version") } major := s[0] - '0' minor := s[2] - '0' if major > 1 || minor > 2 { return 0, 0, errors.New("unsupported version") } return uint8(major), uint8(minor), nil } func (parser *parser) parseHeaders(s *stream, m *message) (cont, ok, complete bool) { if len(s.data)-s.parseOffset >= 2 && bytes.Equal(s.data[s.parseOffset:s.parseOffset+2], []byte("\r\n")) { // EOH m.size = uint64(s.parseOffset + 2) m.rawHeaders = s.data[:m.size] s.data = s.data[m.size:] s.parseOffset = 0 if !m.isRequest && ((100 <= m.statusCode && m.statusCode < 200) || m.statusCode == 204 || m.statusCode == 304) { // response with a 1xx, 204 , or 304 status code is always terminated // by the first empty line after the header fields if isDebug { debugf("Terminate response, status code %d", m.statusCode) } return false, true, true } if m.isRequest { m.sendBody = parser.shouldIncludeInBody(m.contentType, parser.config.includeRequestBodyFor) } else { m.sendBody = parser.shouldIncludeInBody(m.contentType, parser.config.includeResponseBodyFor) } m.saveBody = m.sendBody || (m.contentLength > 0 && bytes.Contains(m.contentType, []byte("urlencoded"))) if m.isChunked { // support for HTTP/1.1 Chunked transfer // Transfer-Encoding overrides the Content-Length if isDebug { debugf("Read chunked body") } s.parseState = stateBodyChunkedStart return true, true, true } if m.contentLength == 0 && (m.isRequest || m.hasContentLength) { if isDebug { debugf("Empty content length, ignore body") } // Ignore body for request that contains a message body but not a Content-Length return false, true, true } if isDebug { debugf("Read body") } s.parseState = stateBody } else { ok, hfcomplete, offset := parser.parseHeader(m, s.data[s.parseOffset:]) if !ok { return false, false, false } if !hfcomplete { return false, true, false } s.parseOffset += offset } return true, true, true } func (parser *parser) parseHeader(m *message, data []byte) (bool, bool, int) { if m.headers == nil { m.headers = make(map[string]common.NetString) } i := bytes.Index(data, []byte(":")) if i == -1 { // Expected \":\" in headers. Assuming incomplete" return true, false, 0 } config := parser.config // enabled if required. Allocs for parameters slow down parser big times if isDetailed { detailedf("Data: %s", data) detailedf("Header: %s", data[:i]) } // skip folding line for p := i + 1; p < len(data); { q := bytes.Index(data[p:], constCRLF) if q == -1 { // Assuming incomplete return true, false, 0 } p += q if len(data) > p && (data[p+1] == ' ' || data[p+1] == '\t') { p = p + 2 } else { var headerNameBuf [140]byte headerName := toLower(headerNameBuf[:], data[:i]) headerVal := trim(data[i+1 : p]) if isDebug { debugf("Header: '%s' Value: '%s'\n", data[:i], headerVal) } // Headers we need for parsing. Make sure we always // capture their value if bytes.Equal(headerName, nameContentLength) { m.contentLength, _ = parseInt(headerVal) m.hasContentLength = true } else if bytes.Equal(headerName, nameContentType) { m.contentType = headerVal } else if bytes.Equal(headerName, nameTransferEncoding) { encodings := parseCommaSeparatedList(headerVal) // 'chunked' can only appear at the end if n := len(encodings); n > 0 && encodings[n-1] == transferEncodingChunked { m.isChunked = true encodings = encodings[:n-1] } if len(encodings) > 0 { // Append at the end of encodings. If a content-encoding // header is also present, it was applied by sender before // transfer-encoding. m.encodings = append(m.encodings, encodings...) } } else if bytes.Equal(headerName, nameContentEncoding) { encodings := parseCommaSeparatedList(headerVal) // Append at the beginning of m.encodings, as Content-Encoding // is supposed to be applied before Transfer-Encoding. m.encodings = append(encodings, m.encodings...) } else if bytes.Equal(headerName, nameConnection) { m.connection = headerVal } else if len(config.realIPHeader) > 0 && bytes.Equal(headerName, []byte(config.realIPHeader)) { if ips := bytes.SplitN(headerVal, []byte{','}, 2); len(ips) > 0 { m.realIP = trim(ips[0]) } } else if bytes.Equal(headerName, nameHost) { m.host = headerVal } else if bytes.Equal(headerName, nameReferer) { m.referer = headerVal } else if bytes.Equal(headerName, nameUserAgent) { m.userAgent = headerVal } if config.sendHeaders { if !config.sendAllHeaders { _, exists := config.headersWhitelist[string(headerName)] if !exists { return true, true, p + 2 } } if val, ok := m.headers[string(headerName)]; ok { composed := make([]byte, len(val)+len(headerVal)+2) off := copy(composed, val) copy(composed[off:], []byte(", ")) copy(composed[off+2:], headerVal) m.headers[string(headerName)] = composed } else { m.headers[string(headerName)] = headerVal } } return true, true, p + 2 } } return true, false, len(data) } func parseCommaSeparatedList(s common.NetString) (list []string) { values := bytes.Split(s, []byte(",")) list = make([]string, len(values)) for idx := range values { list[idx] = string(bytes.ToLower(bytes.Trim(values[idx], " "))) } return list } func (*parser) parseBody(s *stream, m *message) (ok, complete bool) { nbytes := len(s.data) if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { m.size += uint64(nbytes) s.bodyReceived += nbytes m.contentLength += nbytes // HTTP/1.0 no content length. Add until the end of the connection if isDebug { debugf("http conn close, received %d", len(s.data)) } if m.saveBody { m.body = append(m.body, s.data...) } s.data = nil return true, false } else if nbytes >= m.contentLength-s.bodyReceived { wanted := m.contentLength - s.bodyReceived if m.saveBody { m.body = append(m.body, s.data[:wanted]...) } s.bodyReceived = m.contentLength m.size += uint64(wanted) s.data = s.data[wanted:] return true, true } else { if m.saveBody { m.body = append(m.body, s.data...) } s.data = nil s.bodyReceived += nbytes m.size += uint64(nbytes) if isDebug { debugf("bodyReceived: %d", s.bodyReceived) } return true, false } } // eatBody acts as if size bytes were received, without having access to // those bytes. func (*parser) eatBody(s *stream, m *message, size int) (ok, complete bool) { if isDebug { debugf("eatBody body") } if !m.hasContentLength && (bytes.Equal(m.connection, constClose) || (isVersion(m.version, 1, 0) && !bytes.Equal(m.connection, constKeepAlive))) { // HTTP/1.0 no content length. Add until the end of the connection if isDebug { debugf("http conn close, received %d", size) } m.size += uint64(size) s.bodyReceived += size m.contentLength += size return true, false } else if size >= m.contentLength-s.bodyReceived { wanted := m.contentLength - s.bodyReceived s.bodyReceived += wanted m.size = uint64(len(m.rawHeaders) + m.contentLength) return true, true } else { s.bodyReceived += size m.size += uint64(size) if isDebug { debugf("bodyReceived: %d", s.bodyReceived) } return true, false } } func (*parser) parseBodyChunkedStart(s *stream, m *message) (cont, ok, complete bool) { // read hexa length i := bytes.Index(s.data, constCRLF) if i == -1 { return false, true, false } line := string(s.data[:i]) chunkLength, err := strconv.ParseInt(line, 16, 32) if err != nil { logp.Warn("Failed to understand chunked body start line") return false, false, false } m.chunkedLength = int(chunkLength) s.data = s.data[i+2:] //+ \r\n m.size += uint64(i + 2) if m.chunkedLength == 0 { if len(s.data) < 2 { s.parseState = stateBodyChunkedWaitFinalCRLF return false, true, false } m.size += 2 if s.data[0] != '\r' || s.data[1] != '\n' { logp.Warn("Expected CRLF sequence at end of message") return false, false, false } s.data = s.data[2:] return false, true, true } s.bodyReceived = 0 s.parseState = stateBodyChunked return true, true, false } func (*parser) parseBodyChunked(s *stream, m *message) (cont, ok, complete bool) { wanted := m.chunkedLength - s.bodyReceived if len(s.data) >= wanted+2 /*\r\n*/ { // Received more data than expected if m.saveBody { m.body = append(m.body, s.data[:wanted]...) } m.size += uint64(wanted + 2) s.data = s.data[wanted+2:] m.contentLength += m.chunkedLength s.parseState = stateBodyChunkedStart return true, true, false } if len(s.data) >= wanted { // we need need to wait for the +2, else we can crash on next call return false, true, false } // Received less data than expected if m.saveBody { m.body = append(m.body, s.data...) } s.bodyReceived += len(s.data) m.size += uint64(len(s.data)) s.data = nil return false, true, false } func (*parser) parseBodyChunkedWaitFinalCRLF(s *stream, m *message) (ok, complete bool) { if len(s.data) < 2 { return true, false } m.size += 2 if s.data[0] != '\r' || s.data[1] != '\n' { logp.Warn("Expected CRLF sequence at end of message") return false, false } s.data = s.data[2:] return true, true } func (parser *parser) shouldIncludeInBody(contenttype []byte, capturedContentTypes []string) bool { for _, include := range capturedContentTypes { if bytes.Contains(contenttype, []byte(include)) { if isDebug { debugf("Should Include Body = true Content-Type %s include_body %s", contenttype, include) } return true } } if isDebug { debugf("Should Include Body = false Content-Type %s", contenttype) } return false } func (m *message) headersReceived() bool { return m.headerOffset > 0 } func (m *message) getEndpoints() (src *common.Endpoint, dst *common.Endpoint) { source, destination := common.MakeEndpointPair(m.tcpTuple.BaseTuple, m.cmdlineTuple) src, dst = &source, &destination if m.direction == tcp.TCPDirectionReverse { src, dst = dst, src } return src, dst } func isVersion(v version, major, minor uint8) bool { return v.major == major && v.minor == minor } func trim(buf []byte) []byte { return trimLeft(trimRight(buf)) } func trimLeft(buf []byte) []byte { for i, b := range buf { if b != ' ' && b != '\t' { return buf[i:] } } return nil } func trimRight(buf []byte) []byte { for i := len(buf) - 1; i > 0; i-- { b := buf[i] if b != ' ' && b != '\t' { return buf[:i+1] } } return nil } func parseInt(line []byte) (int, error) { buf := streambuf.NewFixed(line) i, err := buf.IntASCII(false) return int(i), err // TODO: is it an error if 'buf.Len() != 0 {}' ? } func toLower(buf, in []byte) []byte { if len(in) > len(buf) { goto unbufferedToLower } for i, b := range in { if b > 127 { goto unbufferedToLower } if 'A' <= b && b <= 'Z' { b = b - 'A' + 'a' } buf[i] = b } return buf[:len(in)] unbufferedToLower: return bytes.ToLower(in) }