pubsublite/internal/psltest/util.go (119 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package psltest contains utilities for pubsublite tests. package psltest import ( "context" "fmt" "strings" "testing" "cloud.google.com/go/pubsublite" "google.golang.org/api/iterator" ) // Cleanup deletes all previous test topics/subscriptions from previous test // runs. This prevents previous test failures from building up resources that // count against quota. func Cleanup(t *testing.T, client *pubsublite.AdminClient, proj, region, namePrefix string, zones []string) { ctx := context.Background() topicSubstring := "/topics/" + namePrefix subscriptionSubstring := "/subscriptions/" + namePrefix reservationSubstring := "/reservations/" + namePrefix for _, zone := range zones { parent := fmt.Sprintf("projects/%s/locations/%s", proj, zone) topicIter := client.Topics(ctx, parent) for { topic, err := topicIter.Next() if err == iterator.Done { break } if err != nil { t.Fatalf("topicIter.Next got err: %v", err) } if !strings.Contains(topic.Name, topicSubstring) { continue } if err := client.DeleteTopic(ctx, topic.Name); err != nil { t.Fatalf("AdminClient.DeleteTopic got err: %v", err) } } subIter := client.Subscriptions(ctx, parent) for { sub, err := subIter.Next() if err == iterator.Done { break } if err != nil { t.Fatalf("subIter.Next() got err: %v", err) } if !strings.Contains(sub.Name, subscriptionSubstring) { continue } if err := client.DeleteSubscription(ctx, sub.Name); err != nil { t.Fatalf("AdminClient.DeleteSubscription got err: %v", err) } } } resIter := client.Reservations(ctx, fmt.Sprintf("projects/%s/locations/%s", proj, region)) for { res, err := resIter.Next() if err == iterator.Done { break } if err != nil { t.Fatalf("resIter.Next() got err: %v", err) } if !strings.Contains(res.Name, reservationSubstring) { continue } if err := client.DeleteReservation(ctx, res.Name); err != nil { t.Fatalf("AdminClient.DeleteReservation got err: %v", err) } } } // MustCreateTopic creates a Pub/Sub Lite topic and fails the test if // unsuccessful. func MustCreateTopic(ctx context.Context, t *testing.T, client *pubsublite.AdminClient, topicPath string) *pubsublite.TopicConfig { t.Helper() cfg := DefaultTopicConfig(topicPath) topicConfig, err := client.CreateTopic(ctx, *cfg) if err != nil { t.Fatalf("AdminClient.CreateTopic got err: %v", err) } return topicConfig } // DefaultTopicConfig returns the default topic config for tests. func DefaultTopicConfig(topicPath string) *pubsublite.TopicConfig { cfg := &pubsublite.TopicConfig{ Name: topicPath, PartitionCount: 2, PublishCapacityMiBPerSec: 4, SubscribeCapacityMiBPerSec: 8, PerPartitionBytes: 30 * 1024 * 1024 * 1024, // 30 GiB RetentionDuration: pubsublite.InfiniteRetention, } return cfg } // MustCreateSubscription creates a Pub/Sub Lite subscription and fails the test // if unsuccessful. func MustCreateSubscription(ctx context.Context, t *testing.T, client *pubsublite.AdminClient, topicPath, subPath string) *pubsublite.SubscriptionConfig { t.Helper() cfg := DefaultSubConfig(topicPath, subPath) subConfig, err := client.CreateSubscription(ctx, *cfg) if err != nil { t.Fatalf("AdminClient.CreateSubscription got err: %v", err) } return subConfig } // DefaultSubConfig returns the default subscription config for tests. func DefaultSubConfig(topicPath, subPath string) *pubsublite.SubscriptionConfig { cfg := &pubsublite.SubscriptionConfig{ Name: subPath, Topic: topicPath, DeliveryRequirement: pubsublite.DeliverImmediately, } return cfg } func MustCreateReservation(ctx context.Context, t *testing.T, client *pubsublite.AdminClient, resPath string) *pubsublite.ReservationConfig { t.Helper() cfg := DefaultResConfig(resPath) resConfig, err := client.CreateReservation(ctx, *cfg) if err != nil { t.Fatalf("AdminClient.CreateReservation got err: %v", err) } return resConfig } // DefaultResConfig returns the default reservation config for tests. func DefaultResConfig(resPath string) *pubsublite.ReservationConfig { cfg := &pubsublite.ReservationConfig{ Name: resPath, ThroughputCapacity: 4, } return cfg }