in pkg/accesslog/collector/protocols/http1.go [84:137]
func (p *HTTP1Protocol) Analyze(connection *PartitionConnection, _ *AnalyzeHelper) error {
metrics := connection.Metrics(enums.ConnectionProtocolHTTP).(*HTTP1Metrics)
buf := connection.Buffer(enums.ConnectionProtocolHTTP)
http1Log.Debugf("ready to analyze HTTP/1 protocol data, connection ID: %d, random ID: %d, data len: %d",
metrics.ConnectionID, metrics.RandomID, buf.DataLength())
p.handleUnFinishedEvents(metrics, connection)
buf.ResetForLoopReading()
for {
if !buf.PrepareForReading() {
return nil
}
messageType, err := p.reader.IdentityMessageType(buf)
if err != nil {
http1Log.Debugf("failed to identity message type, %v", err)
if buf.SkipCurrentElement() {
break
}
continue
}
var result enums.ParseResult
switch messageType {
case reader.MessageTypeRequest:
result, err = p.handleRequest(metrics, buf)
case reader.MessageTypeResponse:
result, err = p.handleResponse(metrics, connection, buf)
case reader.MessageTypeUnknown:
result = enums.ParseResultSkipPackage
}
if err != nil {
http1Log.Warnf("failed to handle HTTP/1.x protocol, connection ID: %d, random ID: %d, data id: %d, error: %v",
metrics.ConnectionID, metrics.RandomID, buf.Position().DataID(), err)
}
http1Log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+
"connection ID: %d, random ID: %d, metrics : %p, handle result: %d",
messageType, buf, buf.Position().DataID(), metrics.ConnectionID, metrics.RandomID, metrics, result)
finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = buf.RemoveReadElements(false)
case enums.ParseResultSkipPackage:
finishReading = buf.SkipCurrentElement()
log.Debugf("skip current element, data id: %d, buf: %p, connection ID: %d, random ID: %d",
buf.Position().DataID(), buf, metrics.ConnectionID, metrics.RandomID)
}
if finishReading {
break
}
}
return nil
}