func()

in pcap-cli/internal/transformer/json_translator.go [1309:1713]


func (t *JSONPcapTranslator) trySetHTTP(
	ctx context.Context,
	packet *gopacket.Packet,
	lock *flowLock,
	flowID *uint64,
	tcpFlags *uint8,
	sequence *uint32,
	appLayerData []byte,
	json *gabs.Container,
	message *string,
	tsp TraceAndSpanProvider,
) (*gabs.Container, bool /* handled */, bool /* isHTTP2 */) {
	isHTTP11Request := http11RequestPayloadRegex.Match(appLayerData)
	isHTTP11Response := !isHTTP11Request && http11ResponsePayloadRegex.Match(appLayerData)

	isHTTP2 := !isHTTP11Request && !isHTTP11Response && http2PrefaceRegex.Match(appLayerData)
	framer := http2.NewFramer(io.Discard, bytes.NewReader(appLayerData))
	frame, frameErr := framer.ReadFrame()

	// if content is not HTTP in clear text, abort
	if !isHTTP11Request && !isHTTP11Response && !isHTTP2 && frame == nil {
		return json, false, false
	}

	// SETs are used to avoid duplicates
	streams := mapset.NewThreadUnsafeSet[uint32]()
	requestStreams := mapset.NewThreadUnsafeSet[uint32]()
	responseStreams := mapset.NewThreadUnsafeSet[uint32]()
	dataStreams := mapset.NewThreadUnsafeSet[uint32]()
	requestTS := make(map[uint32]*traceAndSpan)
	responseTS := make(map[uint32]*traceAndSpan)

	// making at least 1 big assumption:
	//   HTTP request/status line and headers fit in 1 packet ( TCP segment )
	//     which is not always the case when fragmentation occurs
	L7, _ := json.Object("HTTP")

	defer func() {
		var lockLatency *time.Duration = nil
		if requestStreams.Cardinality() > 0 ||
			responseStreams.Cardinality() > 0 {
			_, lockLatency = lock.UnlockWithTraceAndSpan(
				ctx, tcpFlags, isHTTP2,
				requestStreams.ToSlice(),
				responseStreams.ToSlice(),
				requestTS, responseTS,
			)
		} else {
			_, lockLatency = lock.UnlockWithTCPFlags(ctx, tcpFlags)
		}
		json.Set(lockLatency.String(), "ll")
	}()

	if isHTTP2 {
		L7.Set(true, "preface")
		h2cData := http2PrefaceRegex.ReplaceAll(appLayerData, nil)
		if len(h2cData) == 0 {
			L7.Set("h2c", "proto")
			L7.Set(string(appLayerData), "raw")
			return L7, true, true
		}
		framer = http2.NewFramer(io.Discard, bytes.NewReader(h2cData))
		frame, frameErr = framer.ReadFrame()
	}

	isHTTP2 = (isHTTP2 || frame != nil)

	// handle h2c traffic
	if isHTTP2 {
		L7.Set("h2c", "proto")
		streamsJSON, _ := L7.Object("streams")

		// multple h2 frames ( from multiple streams ) may be delivered by the same packet
		for frame != nil {

			isRequest := false
			isResponse := false

			frameHeader := frame.Header()

			// h2 is multiplexed, `StreamID` allows to link HTTP conversations
			//   - see: https://datatracker.ietf.org/doc/html/rfc9113#name-stream-identifiers
			//     - Streams initiated by a client MUST use odd-numbered stream identifiers
			//     - Streams initiated by the server MUST use even-numbered stream identifiers
			//     - A stream identifier of zero (0x00) is used for connection control messages
			//     - Stream identifiers cannot be reused.
			// A stream is equal to a single HTTP conversation: request and response.
			StreamID := frameHeader.StreamID
			StreamIDstr := strconv.FormatUint(uint64(StreamID), 10)
			streams.Add(StreamID)

			ts, traced := tsp(&StreamID)

			var stream, frames *gabs.Container
			if stream = streamsJSON.S(StreamIDstr); stream == nil {
				stream, _ = streamsJSON.Object(StreamIDstr)
				_, _ = stream.Array("frames")
				stream.Set(StreamID, "id")
			} else if frames = stream.S("frames"); frames == nil {
				_, _ = stream.Array("frames")
			}

			frameJSON := gabs.New()
			stream.ArrayAppend(frameJSON, "frames")

			if m := http2RawFrameRegex.
				FindStringSubmatch(frameHeader.String()); len(m) > 0 {
				frameJSON.Set(m[1], "raw")
			}

			sizeOfFrame := frameHeader.Length /* uint32 */
			frameJSON.Set(sizeOfFrame, "len")

			// see: https://pkg.go.dev/golang.org/x/net/http2#Flags
			flagsJSON, _ := frameJSON.Object("flags")
			flagsJSON.Set("0b"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 2), "bin")
			flagsJSON.Set("0x"+strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 16), "hex")
			flagsJSON.Set(strconv.FormatUint(uint64(frameHeader.Flags /* uint8 */), 10), "dec")

			var _ts *traceAndSpan = nil

			switch frame := frame.(type) {
			case *http2.GoAwayFrame:
				frameJSON.Set("goaway", "type")

			case *http2.RSTStreamFrame:
				frameJSON.Set("rst", "type")

			case *http2.PingFrame:
				frameJSON.Set("ping", "type")
				frameJSON.Set(frame.IsAck(), "ack")
				frameJSON.Set(string(frame.Data[:]), "data")

			case *http2.SettingsFrame:
				frameJSON.Set("settings", "type")
				settings, _ := frameJSON.Object("settings")
				frame.ForeachSetting(func(s http2.Setting) error {
					// see: https://pkg.go.dev/golang.org/x/net/http2#SettingID
					settings.Set(strconv.FormatUint(uint64(s.Val), 10),
						"0x"+strconv.FormatUint(uint64(s.ID), 16))
					return nil
				})
				frameJSON.Set(frame.IsAck(), "ack")

			case *http2.HeadersFrame:
				frameJSON.Set("headers", "type")
				decoder := hpack.NewDecoder(2048, nil)
				hf, _ := decoder.DecodeFull(frame.HeaderBlockFragment())
				headers := http.Header{}
				for _, header := range hf {
					isRequest = (isRequest || (header.Name == ":method"))
					isResponse = (isResponse || (header.Name == ":status"))
					// `Add(...)` internally applies `http.CanonicalHeaderKey(...)`
					headers.Add(header.Name, header.Value)
				}
				decoder.Close()
				if _ts = t.addHTTPHeaders(frameJSON, &headers); _ts != nil {
					_ts.streamID = &StreamID
					if isRequest {
						requestTS[StreamID] = _ts
					} else if isResponse {
						responseTS[StreamID] = _ts
					}
				} else if traced && isResponse {
					responseTS[StreamID] = ts
				}

			case *http2.MetaHeadersFrame:
				frameJSON.Set("metadata", "type")
				mdJSON, _ := frameJSON.Object("metadata")
				for _, md := range frame.Fields {
					mdJSON.Set(md.Value, md.Name)
				}

			case *http2.DataFrame:
				dataStreams.Add(StreamID)
				frameJSON.Set("data", "type")
				data := frame.Data()
				sizeOfData := int64(sizeOfFrame)
				t.addHTTPBodyDetails(frameJSON, &sizeOfData, bytes.NewReader(data))
			}

			if isRequest {
				requestStreams.Add(StreamID)
				frameJSON.Set("request", "kind")
			} else if isResponse {
				responseStreams.Add(StreamID)
				frameJSON.Set("response", "kind")
			}

			// multiple streams with frames for req/res
			// might arrive within the same TCP segment
			if _ts != nil {
				t.setTraceAndSpan(frameJSON, _ts)
			} else if traced {
				t.setTraceAndSpan(frameJSON, ts)
			}

			// read next frame
			frame, frameErr = framer.ReadFrame()
		}

		if frameErr != nil && frameErr != io.EOF && frameErr != io.ErrUnexpectedEOF {
			errorJSON, _ := L7.Object("error")
			errorJSON.Set("INVALID_HTTP2_FRAME", "code")
			errorJSON.Set(frameErr.Error(), "info")
		}

		streamsJSONbytes, err := streams.MarshalJSON()
		if err == nil {
			L7.Set(string(streamsJSONbytes), "includes")
		} else {
			L7.Set(streams.ToSlice(), "includes")
		}

		sizeOfStreams := streams.Cardinality()
		if (sizeOfStreams == 1 && streams.Contains(0)) || sizeOfStreams > 10 {
			json.Set(stringFormatter.Format("{0} | {1}", *message, "h2c"), "message")
		} else {
			json.Set(stringFormatter.Format("{0} | {1} | streams:{2} | req:{3} | res:{4} | data:{5}", *message, "h2c",
				streams.ToSlice(), requestStreams.ToSlice(), responseStreams.ToSlice(), dataStreams.ToSlice()), "message")
		}

		return L7, true, true
	}

	// HTTP/1.1 is not multiplexed, so `StreamID` is always `1`
	StreamID := http11StreamID
	ts, traced := tsp(&StreamID)

	streams.Add(StreamID)

	fragmented := false // stop tracking is the default behavior
	defer func() {
		// some HTTP Servers split headers and body by flushing immediately after headers,
		// so if this packet is carrying an HTTP Response, stop trace-tracking if:
		//   - the packet contains the full HTTP Response body, or more specifically:
		//     - if the `Content-Length` header value is equal to the observed `size-of-payload`:
		//       - which means that the HTTP Response is not fragmented.
		L7.Set(fragmented, "fragmented")
	}()

	// L7 is a quasi-RAW representation of the HTTP message.
	// see: https://www.rfc-editor.org/rfc/rfc7540#section-8.1.3
	dataBytes := bytes.SplitN(appLayerData, http11BodySeparator, 2)
	// `parts` is everything before HTTP payload separator (`2*line-break`)
	//   - it includes: the HTTP line, and HTTP headers
	parts := bytes.Split(dataBytes[0], http11Separator)
	meta, _ := json.Object("L7") // `parts[0]` is the HTTP/1.1 preface
	meta.Set("HTTP", "proto")
	meta.Set(string(parts[0]), "preface")
	metaHeaders, _ := meta.ArrayOfSize(len(parts)-1, "headers")
	// HTTP headers starts at `parts[1]`
	for i, header := range parts[1:] {
		if len(header) > 128 {
			metaHeaders.SetIndex(string(header[:128-3]), i)
		} else if len(header) > 0 {
			metaHeaders.SetIndex(string(header), i)
		} else {
			metaHeaders.SetIndex("<EMPTY>", i)
		}
	}
	if len(dataBytes) > 1 {
		parts = bytes.Split(dataBytes[1], http11Separator)
		body, _ := meta.ArrayOfSize(len(parts), "body")
		for i, line := range parts {
			if len(line) > 128 {
				body.SetIndex(string(line[:128-3])+"...", i)
			} else if len(line) > 0 {
				body.SetIndex(string(line), i)
			} else {
				body.SetIndex("<EMPTY>", i)
			}
		}
	}

	httpDataReader := bufio.NewReaderSize(bytes.NewReader(appLayerData), len(appLayerData))

	// attempt to parse HTTP/1.1 request
	if isHTTP11Request {
		requestStreams.Add(StreamID)

		L7.Set("request", "kind")

		request, err := http.ReadRequest(httpDataReader)

		if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || request == nil {
			errorJSON, _ := L7.Object("error")
			errorJSON.Set("INVALID_HTTP11_REQUEST", "code")
			if err != nil {
				errorJSON.Set(err.Error(), "info")
			}
			errorJSON.Set(request != nil, "parsed")

			L7.Set("HTTP/1.1", "proto")

			json.Set(stringFormatter.Format("{0} | {1}: {2}",
				*message, "INVALID_HTTP11_REQUEST", err.Error()), "message")

			return L7, true, false
		}

		url := ""
		if _url := request.URL; _url != nil {
			url = _url.String()
		}

		if url == "" {
			if parts := http11RequestPayloadRegex.
				FindSubmatch(appLayerData); len(parts) >= 3 {
				url = string(parts[2])
				L7.Set(url, "url")
				L7.Set("HTTP/1.1", "proto")
			}
			// abort, not safe to continue,
			// the "quasi-RAW" will tell...
			return L7, true, false
		}

		L7.Set(url, "url")
		L7.Set(request.Proto, "proto")
		L7.Set(request.Method, "method")

		if _ts := t.addHTTPHeaders(L7, &request.Header); _ts != nil {
			_ts.streamID = &StreamID
			requestTS[StreamID] = _ts
			// include trace and span id for traceability
			t.setTraceAndSpan(json, _ts)
			t.recordHTTP11Request(packet, flowID, sequence, _ts, &request.Method, &request.Host, &url)
		}

		sizeOfBody := t.addHTTPBodyDetails(L7, &request.ContentLength, request.Body)
		if sizeOfBody > 0 {
			dataStreams.Add(StreamID)
		}
		if cl, clErr := strconv.ParseUint(request.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
			fragmented = cl > sizeOfBody
		}

		json.Set(stringFormatter.Format("{0} | {1} {2} {3}", *message, request.Proto, request.Method, url), "message")

		return L7, true, false
	}

	// attempt to parse HTTP/1.1 response
	if isHTTP11Response {
		responseStreams.Add(StreamID)

		L7.Set("response", "kind")

		// Go's `http` implementation may miss the `Transfer-Encoding` header
		//   - see: https://github.com/golang/go/issues/27061
		response, err := http.ReadResponse(httpDataReader, nil)

		if (err != nil && err != io.EOF && err != io.ErrUnexpectedEOF) || response == nil {
			errorJSON, _ := L7.Object("error")
			errorJSON.Set("INVALID_HTTP11_RESPONSE", "code")
			if err != nil {
				errorJSON.Set(err.Error(), "info")
			}
			errorJSON.Set(response != nil, "parsed")

			L7.Set("HTTP/1.1", "proto")

			json.Set(stringFormatter.Format("{0} | {1}: {2}",
				*message, "INVALID_HTTP11_RESPONSE", err.Error()), "message")

			return L7, true, false
		}

		L7.Set(response.Proto, "proto")
		L7.Set(response.StatusCode, "code")
		L7.Set(response.Status, "status")

		if _ts := t.addHTTPHeaders(L7, &response.Header); _ts != nil {
			_ts.streamID = &StreamID
			responseTS[StreamID] = _ts
			// include trace and span id for traceability
			t.setTraceAndSpan(json, _ts)
			if linkErr := t.linkHTTP11ResponseToRequest(packet, flowID, L7, _ts); linkErr != nil {
				io.WriteString(os.Stderr, linkErr.Error()+"\n")
			}
		} else if traced {
			responseTS[StreamID] = ts
			t.setTraceAndSpan(json, ts)
			t.linkHTTP11ResponseToRequest(packet, flowID, L7, ts)
		}

		sizeOfBody := t.addHTTPBodyDetails(L7, &response.ContentLength, response.Body)
		if sizeOfBody > 0 {
			dataStreams.Add(StreamID)
		}
		if cl, clErr := strconv.ParseUint(response.Header.Get(httpContentLengthHeader), 10, 64); clErr == nil {
			// if content-length is greater than the size of body:
			//   - this HTTP message is fragmented and so there's more to come
			fragmented = cl > sizeOfBody
		}

		json.Set(stringFormatter.Format("{0} | {1} {2}", *message, response.Proto, response.Status), "message")

		return L7, true, false
	}

	return json, true, false
}