pkg/accesslog/collector/protocols/http1.go (261 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package protocols import ( "container/list" "fmt" "io" "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader" "github.com/apache/skywalking-rover/pkg/tools/buffer" "github.com/apache/skywalking-rover/pkg/tools/enums" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) var http1Log = logger.GetLogger("accesslog", "collector", "protocols", "http1") var http1AnalyzeMaxRetryCount = 3 type HTTP1ProtocolAnalyzer interface { HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection, request *reader.Request, response *reader.Response) error OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection) } type HTTP1Protocol struct { ctx *common.AccessLogContext analyzer HTTP1ProtocolAnalyzer reader *reader.Reader } func NewHTTP1Analyzer(ctx *common.AccessLogContext, analyze HTTP1ProtocolAnalyzer) *HTTP1Protocol { protocol := &HTTP1Protocol{ctx: ctx, reader: reader.NewReader()} if analyze == nil { protocol.analyzer = protocol } else { protocol.analyzer = analyze } return protocol } type HTTP1Metrics struct { ConnectionID uint64 RandomID uint64 halfRequests *list.List analyzeUnFinished *list.List } func (p *HTTP1Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics { return &HTTP1Metrics{ ConnectionID: connectionID, RandomID: randomID, halfRequests: list.New(), analyzeUnFinished: list.New(), } } type HTTP1AnalyzeUnFinished struct { request *reader.Request response *reader.Response retryCount int } func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error { metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics) buf := connection.Buffer(enums.ConnectionProtocolHTTP) http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d", metrics.ConnectionID, metrics.RandomID, buf.DataLength()) p.handleUnFinishedEvents(metrics, connection) buf.ResetForLoopReading() for { if !buf.PrepareForReading() { return nil } messageType, err := p.reader.IdentityMessageType(buf) if err != nil { http1Log.Debugf("failed to identity message type, %v", err) if buf.SkipCurrentElement() { break } continue } var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: result, err = p.handleRequest(metrics, buf) case reader.MessageTypeResponse: result, err = p.handleResponse(metrics, connection, buf) case reader.MessageTypeUnknown: result = enums.ParseResultSkipPackage } if err != nil { http1Log.Warnf("failed to handle HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d, error: %v", metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err) } http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+ "connection ID: %d, random ID: %d, metrics : %p, handle result: %d", messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result) finishReading := false switch result { case enums.ParseResultSuccess: finishReading = buf.RemoveReadElements(false) case enums.ParseResultSkipPackage: finishReading = buf.SkipCurrentElement() log.Debugf("skip current element, data id: %d, buf: %p, connection ID: %d, random ID: %d", buf.Position().DataID(), buf, metrics.ConnectionID, metrics.RandomID) } if finishReading { break } } return nil } func (p *HTTP1Protocol) ForProtocol() enums.ConnectionProtocol { return enums.ConnectionProtocolHTTP } func (p *HTTP1Protocol) handleRequest(metrics *HTTP1Metrics, buf *buffer.Buffer) (enums.ParseResult, error) { req, result, err := p.reader.ReadRequest(buf, true) if err != nil { return enums.ParseResultSkipPackage, err } if result != enums.ParseResultSuccess { return result, nil } metrics.appendRequestToList(req) return result, nil } func (p *HTTP1Protocol) handleResponse(metrics *HTTP1Metrics, connection *PartitionConnection, b *buffer.Buffer) (enums.ParseResult, error) { request := metrics.findMatchesRequest(b.Position().DataID(), b.Position().PrevDataID()) if request == nil { log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+ "required prev data id: %d, current data id: %d", metrics.ConnectionID, metrics.RandomID, b.Position().PrevDataID(), b.Position().DataID()) return enums.ParseResultSkipPackage, nil } // parsing response response, result, err := p.reader.ReadResponse(request, b, true) defer func() { // if parsing response failed, then put the request back to the list if result != enums.ParseResultSuccess { metrics.halfRequests.PushFront(request) } }() if err != nil { return enums.ParseResultSkipPackage, err } else if result != enums.ParseResultSuccess { return result, nil } // getting the request and response, then send to the forwarder if analyzeError := p.analyzer.HandleHTTPData(metrics, connection, request, response); analyzeError != nil { p.appendAnalyzeUnFinished(metrics, request, response) } return enums.ParseResultSuccess, nil } func (p *HTTP1Protocol) appendAnalyzeUnFinished(metrics *HTTP1Metrics, request *reader.Request, response *reader.Response) { metrics.analyzeUnFinished.PushBack(&HTTP1AnalyzeUnFinished{ request: request, response: response, retryCount: 0, }) } func (p *HTTP1Protocol) handleUnFinishedEvents(m *HTTP1Metrics, connection *PartitionConnection) { for element := m.analyzeUnFinished.Front(); element != nil; { unFinished := element.Value.(*HTTP1AnalyzeUnFinished) err := p.analyzer.HandleHTTPData(m, connection, unFinished.request, unFinished.response) if err != nil { unFinished.retryCount++ if unFinished.retryCount < http1AnalyzeMaxRetryCount { element = element.Next() continue } http1Log.Warnf("failed to analyze HTTP1 request and response, connection ID: %d, random ID: %d, "+ "retry count: %d, error: %v", m.ConnectionID, m.RandomID, unFinished.retryCount, err) } next := element.Next() m.analyzeUnFinished.Remove(element) element = next } } func (p *HTTP1Protocol) HandleHTTPData(metrics *HTTP1Metrics, connection *PartitionConnection, request *reader.Request, response *reader.Response) error { details := make([]events.SocketDetail, 0) var allInclude = true var idRange *buffer.DataIDRange details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.HeaderBuffer(), idRange, allInclude) details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, request.BodyBuffer(), idRange, allInclude) details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.HeaderBuffer(), idRange, allInclude) details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, response.BodyBuffer(), idRange, allInclude) if !allInclude { return fmt.Errorf("cannot found full detail events for HTTP/1.x protocol, "+ "data id: %d-%d, current details count: %d", idRange.From, idRange.To, len(details)) } http1Log.Debugf("found fully HTTP1 request and response, contains %d detail events, "+ "connection ID: %d, random ID: %d, data range: %d-%d(%t)", len(details), metrics.ConnectionID, metrics.RandomID, idRange.From, idRange.To, idRange.IsToBufferReadFinished) originalRequest := request.Original() originalResponse := response.Original() // delete details(each request or response is fine because it's will delete the original buffer) idRange.DeleteDetails(request.HeaderBuffer()) defer func() { p.CloseStream(originalRequest.Body) p.CloseStream(originalResponse.Body) }() host := request.Headers().Get("Host") if host == "" && originalRequest.URL != nil { host = originalRequest.URL.Host } forwarder.SendTransferProtocolEvent(p.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{ Protocol: &v3.AccessLogProtocolLogs_Http{ Http: &v3.AccessLogHTTPProtocol{ StartTime: forwarder.BuildOffsetTimestamp(details[0].GetStartTime()), EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()), Version: v3.AccessLogHTTPProtocolVersion_HTTP1, Request: &v3.AccessLogHTTPProtocolRequest{ Method: TransformHTTPMethod(originalRequest.Method), Path: originalRequest.URL.Path, SizeOfHeadersBytes: uint64(request.HeaderBuffer().DataSize()), SizeOfBodyBytes: uint64(request.BodyBuffer().DataSize()), Trace: AnalyzeTraceInfo(func(key string) string { return originalRequest.Header.Get(key) }, http1Log), Host: host, }, Response: &v3.AccessLogHTTPProtocolResponse{ StatusCode: int32(originalResponse.StatusCode), SizeOfHeadersBytes: uint64(response.HeaderBuffer().DataSize()), SizeOfBodyBytes: uint64(response.BodyBuffer().DataSize()), }, }, }, })) return nil } func (p *HTTP1Protocol) OnProtocolBreak(metrics *HTTP1Metrics, connection *PartitionConnection) { } func (p *HTTP1Protocol) CloseStream(ioReader io.Closer) { if ioReader != nil { _ = ioReader.Close() } } func (m *HTTP1Metrics) appendRequestToList(req *reader.Request) { if m.halfRequests.Len() == 0 { m.halfRequests.PushFront(req) return } if m.halfRequests.Back().Value.(*reader.Request).MinDataID() < req.MinDataID() { m.halfRequests.PushBack(req) return } beenAdded := false for element := m.halfRequests.Front(); element != nil; element = element.Next() { existEvent := element.Value.(*reader.Request) if existEvent.MinDataID() > req.MinDataID() { m.halfRequests.InsertBefore(req, element) beenAdded = true break } } if !beenAdded { m.halfRequests.PushBack(req) } } func (m *HTTP1Metrics) findMatchesRequest(currentDataID, prevDataID uint64) *reader.Request { for element := m.halfRequests.Front(); element != nil; element = element.Next() { req := element.Value.(*reader.Request) // if the tail data id of request is equals to the prev data id of response // or tail request data id+1==first response data id, then return the request if uint64(req.MaxDataID()) == prevDataID || uint64(req.MaxDataID()+1) == currentDataID { m.halfRequests.Remove(element) return req } } return nil }