func()

in pkg/accesslog/collector/protocols/http1.go [84:137]


func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
	metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
	buf := connection.Buffer(enums.ConnectionProtocolHTTP)
	http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
		metrics.ConnectionID, metrics.RandomID, buf.DataLength())
	p.handleUnFinishedEvents(metrics, connection)
	buf.ResetForLoopReading()
	for {
		if !buf.PrepareForReading() {
			return nil
		}

		messageType, err := p.reader.IdentityMessageType(buf)
		if err != nil {
			http1Log.Debugf("failed to identity message type, %v", err)
			if buf.SkipCurrentElement() {
				break
			}
			continue
		}

		var result enums.ParseResult
		switch messageType {
		case reader.MessageTypeRequest:
			result, err = p.handleRequest(metrics, buf)
		case reader.MessageTypeResponse:
			result, err = p.handleResponse(metrics, connection, buf)
		case reader.MessageTypeUnknown:
			result = enums.ParseResultSkipPackage
		}
		if err != nil {
			http1Log.Warnf("failed to handle HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d, error: %v",
				metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err)
		}

		http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+
			"connection ID: %d, random ID: %d, metrics : %p, handle result: %d",
			messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result)
		finishReading := false
		switch result {
		case enums.ParseResultSuccess:
			finishReading = buf.RemoveReadElements(false)
		case enums.ParseResultSkipPackage:
			finishReading = buf.SkipCurrentElement()
			log.Debugf("skip current element, data id: %d, buf: %p, connection ID: %d, random ID: %d",
				buf.Position().DataID(), buf, metrics.ConnectionID, metrics.RandomID)
		}

		if finishReading {
			break
		}
	}
	return nil
}