in kafka/topiccreator.go [77:105]
func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("kafka: invalid topic creator config: %w", err)
}
if cfg.MeterProvider == nil {
cfg.MeterProvider = otel.GetMeterProvider()
}
meter := cfg.MeterProvider.Meter("github.com/elastic/apm-queue/kafka")
created, err := meter.Int64Counter("topics.created.count",
metric.WithDescription("The number of created topics"),
)
if err != nil {
return nil, fmt.Errorf("failed creating 'topics.created.count' metric: %w", err)
}
var topicConfigs map[string]*string
if len(cfg.TopicConfigs) != 0 {
topicConfigs = make(map[string]*string, len(cfg.TopicConfigs))
for k, v := range cfg.TopicConfigs {
topicConfigs[k] = kadm.StringPtr(v)
}
}
return &TopicCreator{
m: m,
partitionCount: cfg.PartitionCount,
origTopicConfigs: cfg.TopicConfigs,
topicConfigs: topicConfigs,
created: created,
}, nil
}