loadgen/eventhandler/apm-collector.go (50 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package eventhandler
import (
"bytes"
"fmt"
"time"
"github.com/tidwall/gjson"
)
var (
metaHeader = []byte(`{"metadata":`)
rumMetaHeader = []byte(`{"m":`)
)
// APMEventCollector extracts relevant metadata and event from
// single line scans.
type APMEventCollector struct{}
// Filter skips processing RUM related events.
func (a *APMEventCollector) Filter(line []byte) error {
if bytes.HasPrefix(line, rumMetaHeader) {
return fmt.Errorf("rum data support not implemented")
}
return nil
}
// IsMeta identifies metadata lines from APM protocol.
func (a *APMEventCollector) IsMeta(line []byte) bool {
return bytes.HasPrefix(line, metaHeader)
}
// Process processes single lines extracting APM events.
// It uniforms events timestamp.
func (a *APMEventCollector) Process(linecopy []byte) event {
event := event{payload: linecopy}
result := gjson.ParseBytes(linecopy)
result.ForEach(func(key, value gjson.Result) bool {
event.objectType = key.Str // lines look like {"span":{...}}
timestampResult := value.Get("timestamp")
if timestampResult.Exists() {
switch timestampResult.Type {
case gjson.Number:
us := timestampResult.Int()
if us >= 0 {
s := us / 1000000
ns := (us - (s * 1000000)) * 1000
event.timestamp = time.Unix(s, ns)
}
case gjson.String:
tstr := timestampResult.Str
for _, f := range supportedTSFormats {
if t, err := time.Parse(f, tstr); err == nil {
event.timestamp = t
break
}
}
}
}
return false
})
return event
}