in kafka/manager.go [67:92]
func NewManager(cfg ManagerConfig) (*Manager, error) {
if err := cfg.finalize(); err != nil {
return nil, fmt.Errorf("kafka: invalid manager config: %w", err)
}
client, err := cfg.newClient(nil)
if err != nil {
return nil, fmt.Errorf("kafka: failed creating kafka client: %w", err)
}
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/apm-queue/kafka")
deleted, err := meter.Int64Counter("topics.deleted.count",
metric.WithDescription("The number of deleted topics"),
)
if err != nil {
return nil, fmt.Errorf("failed creating 'topics.deleted.count' metric: %w", err)
}
return &Manager{
cfg: cfg,
client: client,
adminClient: kadm.NewClient(client),
tracer: cfg.tracerProvider().Tracer("kafka"),
deleted: deleted,
}, nil
}