in consumer/consumer_client.go [72:103]
func (consumer *ConsumerClient) createConsumerGroup() error {
consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore)
if err != nil {
return fmt.Errorf("list consumer group failed: %w", err)
}
alreadyExist := false
for _, cg := range consumerGroups {
if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName {
alreadyExist = true
if (*cg) != consumer.consumerGroup {
level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg)
} else {
level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName,
"group name", consumer.option.ConsumerGroupName)
return nil
}
}
}
if alreadyExist {
if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
return fmt.Errorf("update consumer group failed: %w", err)
}
} else {
if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" {
return fmt.Errorf("create consumer group failed: %w", err)
}
}
}
return nil
}