in beater/pubsubbeat.go [254:279]
func getOrCreateSubscription(client *pubsub.Client, config *config.Config) (*pubsub.Subscription, error) {
if !config.Subscription.Create {
subscription := client.Subscription(config.Subscription.Name)
return subscription, nil
}
topic := client.Topic(config.Topic)
ctx := context.Background()
subscription, err := client.CreateSubscription(ctx, config.Subscription.Name, pubsub.SubscriptionConfig{
Topic: topic,
RetainAckedMessages: config.Subscription.RetainAckedMessages,
RetentionDuration: config.Subscription.RetentionDuration,
})
if st, ok := status.FromError(err); ok && st.Code() == codes.AlreadyExists {
// The subscription already exists.
subscription = client.Subscription(config.Subscription.Name)
} else if ok && st.Code() == codes.NotFound {
return nil, fmt.Errorf("topic %q does not exists", config.Topic)
} else if err != nil {
return nil, fmt.Errorf("fail to create subscription: %v", err)
}
return subscription, nil
}