in pcap-cli/internal/transformer/json_translator.go [1309:1713]
func (t *JSONPcapTranslator) trySetHTTP(
ctx context.Context,
packet *gopacket.Packet,
lock *flowLock,
flowID *uint64,
tcpFlags *uint8,
sequence *uint32,
appLayerData []byte,
json *gabs.Container,
message *string,
tsp TraceAndSpanProvider,
) (*gabs.Container, bool /* handled */, bool /* isHTTP2 */) {
isHTTP11Request := http11RequestPayloadRegex.Match(appLayerData)
isHTTP11Response := !isHTTP11Request && http11ResponsePayloadRegex.Match(appLayerData)
isHTTP2 := !isHTTP11Request && !isHTTP11Response && http2PrefaceRegex.Match(appLayerData)
framer := http2.NewFramer(io.Discard, bytes.NewReader(appLayerData))
frame, frameErr := framer.ReadFrame()
// if content is not HTTP in clear text, abort
if !isHTTP11Request && !isHTTP11Response && !isHTTP2 && frame == nil {
return json, false, false
}
// SETs are used to avoid duplicates
streams := mapset.NewThreadUnsafeSet[uint32]()
requestStreams := mapset.NewThreadUnsafeSet[uint32]()
responseStreams := mapset.NewThreadUnsafeSet[uint32]()
dataStreams := mapset.NewThreadUnsafeSet[uint32]()
requestTS := make(map[uint32]*traceAndSpan)
responseTS := make(map[uint32]*traceAndSpan)
// making at least 1 big assumption:
// HTTP request/status line and headers fit in 1 packet ( TCP segment )
// which is not always the case when fragmentation occurs
L7, _ := json.Object("HTTP")
defer func() {
var lockLatency *time.Duration = nil
if requestStreams.Cardinality() > 0 ||
responseStreams.Cardinality() > 0 {
_, lockLatency = lock.UnlockWithTraceAndSpan(
ctx, tcpFlags, isHTTP2,
requestStreams.ToSlice(),
responseStreams.ToSlice(),
requestTS, responseTS,
)
} else {
_, lockLatency = lock.UnlockWithTCPFlags(ctx, tcpFlags)
}
json.Set(lockLatency.String(), "ll")
}()
if isHTTP2 {
L7.Set(true, "preface")
h2cData := http2PrefaceRegex.ReplaceAll(appLayerData, nil)
if len(h2cData) == 0 {
L7.Set("h2c", "proto")
L7.Set(string(appLayerData), "raw")
return L7, true, true
}
framer = http2.NewFramer(io.Discard, bytes.NewReader(h2cData))
frame, frameErr = framer.ReadFrame()
}
isHTTP2 = (isHTTP2 || frame != nil)
// handle h2c traffic
if isHTTP2 {
L7.Set("h2c", "proto")
streamsJSON, _ := L7.Object("streams")
// multple h2 frames ( from multiple streams ) may be delivered by the same packet
for frame != nil {
isRequest := false
isResponse := false
frameHeader := frame.Header()
// h2 is multiplexed, `StreamID` allows to link HTTP conversations
// - see: https://datatracker.ietf.org/doc/html/rfc9113#name-stream-identifiers
// - Streams initiated by a client MUST use odd-numbered stream identifiers
// - Streams initiated by the server MUST use even-numbered stream identifiers
// - A stream identifier of zero (0x00) is used for connection control messages
// - Stream identifiers cannot be reused.
// A stream is equal to a single HTTP conversation: request and response.
StreamID := frameHeader.StreamID
StreamIDstr := strconv.FormatUint(uint64(StreamID), 10)
streams.Add(StreamID)
ts, traced := tsp(&StreamID)
var stream, frames *gabs.Container
if stream = streamsJSON.S(StreamIDstr); stream == nil {
stream, _ = streamsJSON.Object(StreamIDstr)
_, _ = stream.Array("frames")
stream.Set(StreamID, "id")
} else if frames = stream.S("frames"); frames == nil {
_, _ = stream.Array("frames")
}
frameJSON := gabs.New()
stream.ArrayAppend(frameJSON, "frames")
if m := http2RawFrameRegex.
FindStringSubmatch(frameHeader.String()); len(m) > 0 {
frameJSON.Set(m[1], "raw")
}
sizeOfFrame := frameHeader.Length /* uint32 */
frameJSON.Set(sizeOfFrame, "len")
// see: https://pkg.go.dev/golang.org/x/net/http2#Flags
flagsJSON, _ := frameJSON.Object("flags")
flagsJSON.Set("0b"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 2), "bin")
flagsJSON.Set("0x"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 16), "hex")
flagsJSON.Set(strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 10), "dec")
var _ts *traceAndSpan = nil
switch frame := frame.(type) {
case *http2.GoAwayFrame:
frameJSON.Set("goaway", "type")
case *http2.RSTStreamFrame:
frameJSON.Set("rst", "type")
case *http2.PingFrame:
frameJSON.Set("ping", "type")
frameJSON.Set(frame.IsAck(), "ack")
frameJSON.Set(string(frame.Data[:]), "data")
case *http2.SettingsFrame:
frameJSON.Set("settings", "type")
settings, _ := frameJSON.Object("settings")
frame.ForeachSetting(func(s http2.Setting) error {
// see: https://pkg.go.dev/golang.org/x/net/http2#SettingID
settings.Set(strconv.FormatUint(uint64(s.Val), 10),
"0x"+strconv.FormatUint(uint64(s.ID), 16))
return nil
})
frameJSON.Set(frame.IsAck(), "ack")
case *http2.HeadersFrame:
frameJSON.Set("headers", "type")
decoder := hpack.NewDecoder(2048, nil)
hf, _ := decoder.DecodeFull(frame.HeaderBlockFragment())
headers := http.Header{}
for _, header := range hf {
isRequest = (isRequest || (header.Name == ":method"))
isResponse = (isResponse || (header.Name == ":status"))
// `Add(...)` internally applies `http.CanonicalHeaderKey(...)`
headers.Add(header.Name, header.Value)
}
decoder.Close()
if _ts = t.addHTTPHeaders(frameJSON, &headers); _ts != nil {
_ts.streamID = &StreamID
if isRequest {
requestTS[StreamID] = _ts
} else if isResponse {
responseTS[StreamID] = _ts
}
} else if traced && isResponse {
responseTS[StreamID] = ts
}
case *http2.MetaHeadersFrame:
frameJSON.Set("metadata", "type")
mdJSON, _ := frameJSON.Object("metadata")
for _, md := range frame.Fields {
mdJSON.Set(md.Value, md.Name)
}
case *http2.DataFrame:
dataStreams.Add(StreamID)
frameJSON.Set("data", "type")
data := frame.Data()
sizeOfData := int64(sizeOfFrame)
t.addHTTPBodyDetails(frameJSON, &sizeOfData, bytes.NewReader(data))
}
if isRequest {
requestStreams.Add(StreamID)
frameJSON.Set("request", "kind")
} else if isResponse {
responseStreams.Add(StreamID)
frameJSON.Set("response", "kind")
}
// multiple streams with frames for req/res
// might arrive within the same TCP segment
if _ts != nil {
t.setTraceAndSpan(frameJSON, _ts)
} else if traced {
t.setTraceAndSpan(frameJSON, ts)
}
// read next frame
frame, frameErr = framer.ReadFrame()
}
if frameErr != nil && frameErr != io.EOF && frameErr != io.ErrUnexpectedEOF {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP2_FRAME", "code")
errorJSON.Set(frameErr.Error(), "info")
}
streamsJSONbytes, err := streams.MarshalJSON()
if err == nil {
L7.Set(string(streamsJSONbytes), "includes")
} else {
L7.Set(streams.ToSlice(), "includes")
}
sizeOfStreams := streams.Cardinality()
if (sizeOfStreams == 1 && streams.Contains(0)) || sizeOfStreams > 10 {
json.Set(stringFormatter.Format("{0} | {1}", *message, "h2c"), "message")
} else {
json.Set(stringFormatter.Format("{0} | {1} | streams:{2} | req:{3} | res:{4} | data:{5}", *message, "h2c",
streams.ToSlice(), requestStreams.ToSlice(), responseStreams.ToSlice(), dataStreams.ToSlice()), "message")
}
return L7, true, true
}
// HTTP/1.1 is not multiplexed, so `StreamID` is always `1`
StreamID := http11StreamID
ts, traced := tsp(&StreamID)
streams.Add(StreamID)
fragmented := false // stop tracking is the default behavior
defer func() {
// some HTTP Servers split headers and body by flushing immediately after headers,
// so if this packet is carrying an HTTP Response, stop trace-tracking if:
// - the packet contains the full HTTP Response body, or more specifically:
// - if the `Content-Length` header value is equal to the observed `size-of-payload`:
// - which means that the HTTP Response is not fragmented.
L7.Set(fragmented, "fragmented")
}()
// L7 is a quasi-RAW representation of the HTTP message.
// see: https://www.rfc-editor.org/rfc/rfc7540#section-8.1.3
dataBytes := bytes.SplitN(appLayerData, http11BodySeparator, 2)
// `parts` is everything before HTTP payload separator (`2*line-break`)
// - it includes: the HTTP line, and HTTP headers
parts := bytes.Split(dataBytes[0], http11Separator)
meta, _ := json.Object("L7") // `parts[0]` is the HTTP/1.1 preface
meta.Set("HTTP", "proto")
meta.Set(string(parts[0]), "preface")
metaHeaders, _ := meta.ArrayOfSize(len(parts)-1, "headers")
// HTTP headers starts at `parts[1]`
for i, header := range parts[1:] {
if len(header) > 128 {
metaHeaders.SetIndex(string(header[:128-3]), i)
} else if len(header) > 0 {
metaHeaders.SetIndex(string(header), i)
} else {
metaHeaders.SetIndex("<EMPTY>", i)
}
}
if len(dataBytes) > 1 {
parts = bytes.Split(dataBytes[1], http11Separator)
body, _ := meta.ArrayOfSize(len(parts), "body")
for i, line := range parts {
if len(line) > 128 {
body.SetIndex(string(line[:128-3])+"...", i)
} else if len(line) > 0 {
body.SetIndex(string(line), i)
} else {
body.SetIndex("<EMPTY>", i)
}
}
}
httpDataReader := bufio.NewReaderSize(bytes.NewReader(appLayerData), len(appLayerData))
// attempt to parse HTTP/1.1 request
if isHTTP11Request {
requestStreams.Add(StreamID)
L7.Set("request", "kind")
request, err := http.ReadRequest(httpDataReader)
if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || request == nil {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP11_REQUEST", "code")
if err != nil {
errorJSON.Set(err.Error(), "info")
}
errorJSON.Set(request != nil, "parsed")
L7.Set("HTTP/1.1", "proto")
json.Set(stringFormatter.Format("{0} | {1}: {2}",
*message, "INVALID_HTTP11_REQUEST", err.Error()), "message")
return L7, true, false
}
url := ""
if _url := request.URL; _url != nil {
url = _url.String()
}
if url == "" {
if parts := http11RequestPayloadRegex.
FindSubmatch(appLayerData); len(parts) >= 3 {
url = string(parts[2])
L7.Set(url, "url")
L7.Set("HTTP/1.1", "proto")
}
// abort, not safe to continue,
// the "quasi-RAW" will tell...
return L7, true, false
}
L7.Set(url, "url")
L7.Set(request.Proto, "proto")
L7.Set(request.Method, "method")
if _ts := t.addHTTPHeaders(L7, &request.Header); _ts != nil {
_ts.streamID = &StreamID
requestTS[StreamID] = _ts
// include trace and span id for traceability
t.setTraceAndSpan(json, _ts)
t.recordHTTP11Request(packet, flowID, sequence, _ts, &request.Method, &request.Host, &url)
}
sizeOfBody := t.addHTTPBodyDetails(L7, &request.ContentLength, request.Body)
if sizeOfBody > 0 {
dataStreams.Add(StreamID)
}
if cl, clErr := strconv.ParseUint(request.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
fragmented = cl > sizeOfBody
}
json.Set(stringFormatter.Format("{0} | {1} {2} {3}", *message, request.Proto, request.Method, url), "message")
return L7, true, false
}
// attempt to parse HTTP/1.1 response
if isHTTP11Response {
responseStreams.Add(StreamID)
L7.Set("response", "kind")
// Go's `http` implementation may miss the `Transfer-Encoding` header
// - see: https://github.com/golang/go/issues/27061
response, err := http.ReadResponse(httpDataReader, nil)
if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || response == nil {
errorJSON, _ := L7.Object("error")
errorJSON.Set("INVALID_HTTP11_RESPONSE", "code")
if err != nil {
errorJSON.Set(err.Error(), "info")
}
errorJSON.Set(response != nil, "parsed")
L7.Set("HTTP/1.1", "proto")
json.Set(stringFormatter.Format("{0} | {1}: {2}",
*message, "INVALID_HTTP11_RESPONSE", err.Error()), "message")
return L7, true, false
}
L7.Set(response.Proto, "proto")
L7.Set(response.StatusCode, "code")
L7.Set(response.Status, "status")
if _ts := t.addHTTPHeaders(L7, &response.Header); _ts != nil {
_ts.streamID = &StreamID
responseTS[StreamID] = _ts
// include trace and span id for traceability
t.setTraceAndSpan(json, _ts)
if linkErr := t.linkHTTP11ResponseToRequest(packet, flowID, L7, _ts); linkErr != nil {
io.WriteString(os.Stderr, linkErr.Error()+"\n")
}
} else if traced {
responseTS[StreamID] = ts
t.setTraceAndSpan(json, ts)
t.linkHTTP11ResponseToRequest(packet, flowID, L7, ts)
}
sizeOfBody := t.addHTTPBodyDetails(L7, &response.ContentLength, response.Body)
if sizeOfBody > 0 {
dataStreams.Add(StreamID)
}
if cl, clErr := strconv.ParseUint(response.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
// if content-length is greater than the size of body:
// - this HTTP message is fragmented and so there's more to come
fragmented = cl > sizeOfBody
}
json.Set(stringFormatter.Format("{0} | {1} {2}", *message, response.Proto, response.Status), "message")
return L7, true, false
}
return json, true, false
}