func()

in consumer/consumer_client.go [72:103]


func (consumer *ConsumerClient) createConsumerGroup() error {
	consumerGroups, err := consumer.client.ListConsumerGroup(consumer.option.Project, consumer.option.Logstore)
	if err != nil {
		return fmt.Errorf("list consumer group failed: %w", err)
	}
	alreadyExist := false
	for _, cg := range consumerGroups {
		if cg.ConsumerGroupName == consumer.consumerGroup.ConsumerGroupName {
			alreadyExist = true
			if (*cg) != consumer.consumerGroup {
				level.Info(consumer.logger).Log("msg", "this config is different from original config, try to override it", "old_config", cg)
			} else {
				level.Info(consumer.logger).Log("msg", "new consumer join the consumer group", "consumer name", consumer.option.ConsumerName,
					"group name", consumer.option.ConsumerGroupName)
				return nil
			}
		}
	}
	if alreadyExist {
		if err := consumer.client.UpdateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
			return fmt.Errorf("update consumer group failed: %w", err)
		}
	} else {
		if err := consumer.client.CreateConsumerGroup(consumer.option.Project, consumer.option.Logstore, consumer.consumerGroup); err != nil {
			if slsError, ok := err.(*sls.Error); !ok || slsError.Code != "ConsumerGroupAlreadyExist" {
				return fmt.Errorf("create consumer group failed: %w", err)
			}
		}
	}

	return nil
}