func addDocappenderLibbeatOutputMetrics()

in internal/beatcmd/beat.go [616:670]


func addDocappenderLibbeatOutputMetrics(ctx context.Context, v monitoring.Visitor, sm metricdata.ScopeMetrics) {
	var writeBytes int64

	v.OnRegistryStart()
	v.OnKey("events")
	for _, m := range sm.Metrics {
		switch m.Name {
		case "elasticsearch.events.processed":
			var acked, toomany, failed int64
			data, _ := m.Data.(metricdata.Sum[int64])
			for _, dp := range data.DataPoints {
				status, ok := dp.Attributes.Value(attribute.Key("status"))
				if !ok {
					continue
				}
				switch status.AsString() {
				case "Success":
					acked += dp.Value
				case "TooMany":
					toomany += dp.Value
					fallthrough
				default:
					failed += dp.Value
				}
			}
			monitoring.ReportInt(v, "acked", acked)
			monitoring.ReportInt(v, "failed", failed)
			monitoring.ReportInt(v, "toomany", toomany)
		case "elasticsearch.events.count":
			if value, ok := getScalarInt64(m.Data); ok {
				monitoring.ReportInt(v, "total", value)
			}
		case "elasticsearch.events.queued":
			if value, ok := getScalarInt64(m.Data); ok {
				monitoring.ReportInt(v, "active", value)
			}
		case "elasticsearch.flushed.bytes":
			if value, ok := getScalarInt64(m.Data); ok {
				writeBytes = value
			}
		case "elasticsearch.bulk_requests.count":
			if value, ok := getScalarInt64(m.Data); ok {
				monitoring.ReportInt(v, "batches", value)
			}
		}
	}
	v.OnRegistryFinished()

	if writeBytes > 0 {
		v.OnRegistryStart()
		v.OnKey("write")
		monitoring.ReportInt(v, "bytes", writeBytes)
		v.OnRegistryFinished()
	}
}