func()

in kafka/topiccreator.go [77:105]


func (m *Manager) NewTopicCreator(cfg TopicCreatorConfig) (*TopicCreator, error) {
	if err := cfg.Validate(); err != nil {
		return nil, fmt.Errorf("kafka: invalid topic creator config: %w", err)
	}
	if cfg.MeterProvider == nil {
		cfg.MeterProvider = otel.GetMeterProvider()
	}
	meter := cfg.MeterProvider.Meter("github.com/elastic/apm-queue/kafka")
	created, err := meter.Int64Counter("topics.created.count",
		metric.WithDescription("The number of created topics"),
	)
	if err != nil {
		return nil, fmt.Errorf("failed creating 'topics.created.count' metric: %w", err)
	}
	var topicConfigs map[string]*string
	if len(cfg.TopicConfigs) != 0 {
		topicConfigs = make(map[string]*string, len(cfg.TopicConfigs))
		for k, v := range cfg.TopicConfigs {
			topicConfigs[k] = kadm.StringPtr(v)
		}
	}
	return &TopicCreator{
		m:                m,
		partitionCount:   cfg.PartitionCount,
		origTopicConfigs: cfg.TopicConfigs,
		topicConfigs:     topicConfigs,
		created:          created,
	}, nil
}