internal/managedkafka/fake/fake.go (130 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package fake import ( "context" "fmt" "net" "testing" "cloud.google.com/go/managedkafka/apiv1/managedkafkapb" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/emptypb" longrunningpb "cloud.google.com/go/longrunning/autogen/longrunningpb" ) const ( clusterID = "fake-cluster" topicID = "fake-topic" consumerGroupID = "fake-consumergroup" ) // The reason why we have a fake server is because testing end-to-end will exceed the deadline of 10 minutes. // There is currently no strong support available for maintaining persistent resources either. type fakeManagedKafkaServer struct { managedkafkapb.UnimplementedManagedKafkaServer } func Options(t *testing.T) []option.ClientOption { server := &fakeManagedKafkaServer{} listener, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatal(err) } gsrv := grpc.NewServer() managedkafkapb.RegisterManagedKafkaServer(gsrv, server) fakeServerAddr := listener.Addr().String() go func() { if err := gsrv.Serve(listener); err != nil { panic(err) } }() return []option.ClientOption{ option.WithEndpoint(fakeServerAddr), option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithInsecure()), } } func (f *fakeManagedKafkaServer) CreateCluster(ctx context.Context, req *managedkafkapb.CreateClusterRequest) (*longrunningpb.Operation, error) { anypb := &anypb.Any{} err := anypb.MarshalFrom(req.Cluster) if err != nil { return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err) } return &longrunningpb.Operation{ Done: true, Result: &longrunningpb.Operation_Response{ Response: anypb, }, }, nil } func (f *fakeManagedKafkaServer) DeleteCluster(ctx context.Context, req *managedkafkapb.DeleteClusterRequest) (*longrunningpb.Operation, error) { return &longrunningpb.Operation{ Done: true, Result: &longrunningpb.Operation_Response{ Response: &anypb.Any{}, }, }, nil } func (f *fakeManagedKafkaServer) GetCluster(ctx context.Context, req *managedkafkapb.GetClusterRequest) (*managedkafkapb.Cluster, error) { return &managedkafkapb.Cluster{ Name: clusterID, }, nil } func (f *fakeManagedKafkaServer) ListClusters(ctx context.Context, req *managedkafkapb.ListClustersRequest) (*managedkafkapb.ListClustersResponse, error) { return &managedkafkapb.ListClustersResponse{ Clusters: []*managedkafkapb.Cluster{{ Name: clusterID, }}, }, nil } func (f *fakeManagedKafkaServer) UpdateCluster(ctx context.Context, req *managedkafkapb.UpdateClusterRequest) (*longrunningpb.Operation, error) { anypb := &anypb.Any{} err := anypb.MarshalFrom(req.Cluster) if err != nil { return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err) } return &longrunningpb.Operation{ Done: true, Result: &longrunningpb.Operation_Response{ Response: anypb, }, }, nil } func (f *fakeManagedKafkaServer) CreateTopic(ctx context.Context, req *managedkafkapb.CreateTopicRequest) (*managedkafkapb.Topic, error) { return req.Topic, nil } func (f *fakeManagedKafkaServer) DeleteTopic(ctx context.Context, req *managedkafkapb.DeleteTopicRequest) (*emptypb.Empty, error) { return nil, nil } func (f *fakeManagedKafkaServer) GetTopic(ctx context.Context, req *managedkafkapb.GetTopicRequest) (*managedkafkapb.Topic, error) { return &managedkafkapb.Topic{ Name: topicID, }, nil } func (f *fakeManagedKafkaServer) ListTopics(ctx context.Context, req *managedkafkapb.ListTopicsRequest) (*managedkafkapb.ListTopicsResponse, error) { return &managedkafkapb.ListTopicsResponse{ Topics: []*managedkafkapb.Topic{{ Name: topicID, }}, }, nil } func (f *fakeManagedKafkaServer) UpdateTopic(ctx context.Context, req *managedkafkapb.UpdateTopicRequest) (*managedkafkapb.Topic, error) { return &managedkafkapb.Topic{ Name: topicID, }, nil } func (f *fakeManagedKafkaServer) DeleteConsumerGroup(ctx context.Context, req *managedkafkapb.DeleteConsumerGroupRequest) (*emptypb.Empty, error) { return nil, nil } func (f *fakeManagedKafkaServer) GetConsumerGroup(ctx context.Context, req *managedkafkapb.GetConsumerGroupRequest) (*managedkafkapb.ConsumerGroup, error) { return &managedkafkapb.ConsumerGroup{ Name: consumerGroupID, }, nil } func (f *fakeManagedKafkaServer) ListConsumerGroups(ctx context.Context, req *managedkafkapb.ListConsumerGroupsRequest) (*managedkafkapb.ListConsumerGroupsResponse, error) { return &managedkafkapb.ListConsumerGroupsResponse{ ConsumerGroups: []*managedkafkapb.ConsumerGroup{{ Name: consumerGroupID, }}, }, nil } func (f *fakeManagedKafkaServer) UpdateConsumerGroup(ctx context.Context, req *managedkafkapb.UpdateConsumerGroupRequest) (*managedkafkapb.ConsumerGroup, error) { return &managedkafkapb.ConsumerGroup{ Name: consumerGroupID, }, nil }