in internal/beatcmd/beat.go [616:670]
func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
var writeBytes int64
v.OnRegistryStart()
v.OnKey("events")
for _, m := range sm.Metrics {
switch m.Name {
case "elasticsearch.events.processed":
var acked, toomany, failed int64
data, _ := m.Data.(metricdata.Sum[int64])
for _, dp := range data.DataPoints {
status, ok := dp.Attributes.Value(attribute.Key("status"))
if !ok {
continue
}
switch status.AsString() {
case "Success":
acked += dp.Value
case "TooMany":
toomany += dp.Value
fallthrough
default:
failed += dp.Value
}
}
monitoring.ReportInt(v, "acked", acked)
monitoring.ReportInt(v, "failed", failed)
monitoring.ReportInt(v, "toomany", toomany)
case "elasticsearch.events.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "total", value)
}
case "elasticsearch.events.queued":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "active", value)
}
case "elasticsearch.flushed.bytes":
if value, ok := getScalarInt64(m.Data); ok {
writeBytes = value
}
case "elasticsearch.bulk_requests.count":
if value, ok := getScalarInt64(m.Data); ok {
monitoring.ReportInt(v, "batches", value)
}
}
}
v.OnRegistryFinished()
if writeBytes > 0 {
v.OnRegistryStart()
v.OnKey("write")
monitoring.ReportInt(v, "bytes", writeBytes)
v.OnRegistryFinished()
}
}