pkg/accesslog/collector/protocols/http2.go (286 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package protocols
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3"
)
// maxHTTP2StreamingTime is the max time of an HTTP/2 streaming, if the streaming is too long, then this streaming will split
var maxHTTP2StreamingTime = time.Minute * 3
var http2Log = logger.GetLogger("accesslog", "collector", "protocols", "http2")
type HTTP2StreamAnalyzer interface {
HandleWholeStream(connection *PartitionConnection, stream *HTTP2Streaming) error
OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics)
}
type HTTP2Protocol struct {
ctx *common.AccessLogContext
analyzer HTTP2StreamAnalyzer
}
func NewHTTP2Analyzer(ctx *common.AccessLogContext, analyzer HTTP2StreamAnalyzer) *HTTP2Protocol {
protocol := &HTTP2Protocol{ctx: ctx}
if analyzer == nil {
protocol.analyzer = protocol
} else {
protocol.analyzer = analyzer
}
return protocol
}
type HTTP2Metrics struct {
ConnectionID uint64
RandomID uint64
HpackDecoder *hpack.Decoder
Streams map[uint32]*HTTP2Streaming
}
type HTTP2Streaming struct {
ReqHeader map[string]string
RespHeader map[string]string
ReqHeaderBuffer *buffer.Buffer
ReqBodyBuffer *buffer.Buffer
IsInResponse bool
Status int
RespHeaderBuffer *buffer.Buffer
RespBodyBuffer *buffer.Buffer
Connection *PartitionConnection
}
func (r *HTTP2Protocol) GenerateConnection(connectionID, randomID uint64) ProtocolMetrics {
return &HTTP2Metrics{
ConnectionID: connectionID,
RandomID: randomID,
HpackDecoder: hpack.NewDecoder(4096, nil),
Streams: make(map[uint32]*HTTP2Streaming),
}
}
func (r *HTTP2Protocol) Analyze(connection *PartitionConnection, helper *AnalyzeHelper) error {
http2Metrics := connection.Metrics(enums.ConnectionProtocolHTTP2).(*HTTP2Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP2)
http2Log.Debugf("ready to analyze HTTP/2 protocol data, connection ID: %d, random ID: %d",
http2Metrics.ConnectionID, http2Metrics.RandomID)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
return nil
}
startPosition := buf.Position()
header, err := http2.ReadFrameHeader(buf)
if err != nil {
http2Log.Debugf("failed to read frame header, %v", err)
if buf.SkipCurrentElement() {
break
}
continue
}
http2Log.Debugf("current reading buffer data id: %d, seq: %d", startPosition.DataID(), startPosition.Seq())
var protocolBreak bool
var result enums.ParseResult
switch header.Type {
case http2.FrameHeaders:
result, protocolBreak, _ = r.HandleHeader(connection, &header, startPosition, http2Metrics, buf)
case http2.FrameData:
result, protocolBreak, _ = r.HandleData(connection, &header, startPosition, http2Metrics, buf)
default:
tmp := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(tmp); err != nil {
if errors.Is(err, buffer.ErrNotComplete) {
result = enums.ParseResultSkipPackage
} else {
protocolBreak = true
}
} else {
result = enums.ParseResultSuccess
}
}
// if the protocol break, then stop the loop and notify the caller to skip analyze all data(just sending the detail)
if protocolBreak {
http2Log.Debugf("the HTTP/2 protocol break, maybe not tracing the connection from beginning, skip all data analyze in this connection, "+
"connection ID: %d", http2Metrics.ConnectionID)
helper.ProtocolBreak = true
r.analyzer.OnProtocolBreak(connection, http2Metrics)
break
}
finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = buf.RemoveReadElements(false)
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
}
if finishReading {
break
}
}
return nil
}
func (r *HTTP2Protocol) ForProtocol() enums.ConnectionProtocol {
return enums.ConnectionProtocolHTTP2
}
func (r *HTTP2Protocol) HandleHeader(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
if err := buf.ReadUntilBufferFull(bytes); err != nil {
return enums.ParseResultSkipPackage, true, err
}
headerData, err := metrics.HpackDecoder.DecodeFull(bytes)
if err != nil {
// reading the header failure, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, err
}
// saving stream
streaming := metrics.Streams[header.StreamID]
headers := r.parseHeaders(headerData)
if streaming == nil {
streaming = &HTTP2Streaming{
ReqHeader: headers,
RespHeader: make(map[string]string),
ReqHeaderBuffer: buf.Slice(true, startPos, buf.Position()),
Connection: connection,
}
metrics.Streams[header.StreamID] = streaming
return enums.ParseResultSuccess, false, nil
}
status, contains := headers[":status"]
if contains {
streaming.IsInResponse = true
code, err := strconv.ParseInt(status, 10, 64)
if err != nil {
log.Warnf("cannot parse status code: %s", status)
code = 200
}
streaming.Status = int(code)
}
if !streaming.IsInResponse {
r.AppendHeaders(streaming.ReqHeader, headers)
streaming.ReqHeaderBuffer = buffer.CombineSlices(true, buf, streaming.ReqHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
return enums.ParseResultSuccess, false, nil
}
r.AppendHeaders(streaming.RespHeader, headers)
streaming.RespHeaderBuffer = buffer.CombineSlices(true, buf, streaming.RespHeaderBuffer, buf.Slice(true, startPos, buf.Position()))
// is end of stream and in the response
if header.Flags.Has(http2.FlagHeadersEndStream) {
// should be end of the stream and send to the protocol
_ = r.analyzer.HandleWholeStream(connection, streaming)
// delete streaming
delete(metrics.Streams, header.StreamID)
}
return enums.ParseResultSuccess, false, nil
}
func (r *HTTP2Protocol) validateIsStreamOpenTooLong(connection *PartitionConnection,
metrics *HTTP2Metrics, id uint32, streaming *HTTP2Streaming) {
// if in the response mode or the request body is not nil, then skip
if streaming.IsInResponse || streaming.ReqBodyBuffer == nil {
return
}
// is the body sending too long, then split the stream
socketBuffer := streaming.ReqBodyBuffer.FirstSocketBuffer()
if socketBuffer == nil {
return
}
if time.Since(host.Time(socketBuffer.StartTime())) > maxHTTP2StreamingTime {
http2Log.Debugf("detect the HTTP/2 stream is too long, split the stream, connection ID: %d, stream ID: %d, headers: %v",
metrics.ConnectionID, id, streaming.ReqHeader)
_ = r.analyzer.HandleWholeStream(connection, streaming)
// clean sent buffers
if streaming.ReqBodyBuffer != nil {
streaming.ReqBodyBuffer.Clean()
}
}
}
func (r *HTTP2Protocol) HandleWholeStream(_ *PartitionConnection, stream *HTTP2Streaming) error {
details := make([]events.SocketDetail, 0)
var allInclude = true
var idRange *buffer.DataIDRange
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqHeaderBuffer, idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.ReqBodyBuffer, idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespHeaderBuffer, idRange, allInclude)
details, idRange, allInclude = AppendSocketDetailsFromBuffer(details, stream.RespBodyBuffer, idRange, allInclude)
if !allInclude {
return fmt.Errorf("cannot found any detail events for HTTP/2 protocol, data id: %d-%d, current details count: %d",
stream.ReqHeaderBuffer.FirstSocketBuffer().DataID(), stream.RespBodyBuffer.LastSocketBuffer().DataID(),
len(details))
}
idRange.DeleteDetails(stream.ReqHeaderBuffer)
streamHost := stream.ReqHeader[":authority"]
if streamHost == "" {
streamHost = stream.ReqHeader[":host"]
}
forwarder.SendTransferProtocolEvent(r.ctx, common.NewProtocolLogEvent(details, &v3.AccessLogProtocolLogs{
Protocol: &v3.AccessLogProtocolLogs_Http{
Http: &v3.AccessLogHTTPProtocol{
StartTime: forwarder.BuildOffsetTimestamp(r.FirstDetail(stream.ReqBodyBuffer, details[0]).GetStartTime()),
EndTime: forwarder.BuildOffsetTimestamp(details[len(details)-1].GetEndTime()),
Version: v3.AccessLogHTTPProtocolVersion_HTTP2,
Request: &v3.AccessLogHTTPProtocolRequest{
Method: r.ParseHTTPMethod(stream),
Path: stream.ReqHeader[":path"],
SizeOfHeadersBytes: r.BufferSizeOfZero(stream.ReqHeaderBuffer),
SizeOfBodyBytes: r.BufferSizeOfZero(stream.ReqBodyBuffer),
Host: streamHost,
Trace: AnalyzeTraceInfo(func(key string) string {
return stream.ReqHeader[key]
}, http2Log),
},
Response: &v3.AccessLogHTTPProtocolResponse{
StatusCode: int32(stream.Status),
SizeOfHeadersBytes: r.BufferSizeOfZero(stream.RespHeaderBuffer),
SizeOfBodyBytes: r.BufferSizeOfZero(stream.RespBodyBuffer),
},
},
},
}))
return nil
}
func (r *HTTP2Protocol) OnProtocolBreak(connection *PartitionConnection, metrics *HTTP2Metrics) {
}
func (r *HTTP2Protocol) ParseHTTPMethod(streaming *HTTP2Streaming) v3.AccessLogHTTPProtocolRequestMethod {
method := streaming.ReqHeader[":method"]
if method == "" {
return v3.AccessLogHTTPProtocolRequestMethod_Get
}
return TransformHTTPMethod(strings.ToUpper(method))
}
func (r *HTTP2Protocol) FirstDetail(buf *buffer.Buffer, def events.SocketDetail) events.SocketDetail {
if buf == nil {
return def
}
details := buf.BuildDetails()
if details == nil || details.Len() == 0 {
return def
}
return details.Front().Value.(events.SocketDetail)
}
func (r *HTTP2Protocol) BufferSizeOfZero(buf *buffer.Buffer) uint64 {
if buf == nil {
return 0
}
return uint64(buf.DataSize())
}
func (r *HTTP2Protocol) AppendHeaders(exist, needAppends map[string]string) {
for k, v := range needAppends {
exist[k] = v
}
}
func (r *HTTP2Protocol) HandleData(connection *PartitionConnection, header *http2.FrameHeader, startPos *buffer.Position,
metrics *HTTP2Metrics, buf *buffer.Buffer) (enums.ParseResult, bool, error) {
bytes := make([]byte, header.Length)
streaming := metrics.Streams[header.StreamID]
if streaming == nil {
// cannot found the stream, maybe not tracing the connection from beginning
return enums.ParseResultSkipPackage, true, nil
}
if err := buf.ReadUntilBufferFull(bytes); err != nil {
return enums.ParseResultSkipPackage, false, err
}
if !streaming.IsInResponse {
streaming.ReqBodyBuffer = buffer.CombineSlices(true, buf, streaming.ReqBodyBuffer, buf.Slice(true, startPos, buf.Position()))
} else {
streaming.RespBodyBuffer = buffer.CombineSlices(true, buf, streaming.RespBodyBuffer, buf.Slice(true, startPos, buf.Position()))
}
r.validateIsStreamOpenTooLong(connection, metrics, header.StreamID, streaming)
return enums.ParseResultSuccess, false, nil
}
func (r *HTTP2Protocol) parseHeaders(headers []hpack.HeaderField) map[string]string {
result := make(map[string]string)
for _, header := range headers {
result[header.Name] = header.Value
}
return result
}