internal/cloudmock/metrics.go (142 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cloudmock import ( "context" "net" "strings" "sync" "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" metricpb "google.golang.org/genproto/googleapis/api/metric" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" ) type MetricsTestServer struct { lis net.Listener srv *grpc.Server Endpoint string userAgent string createMetricDescriptorReqs []*monitoringpb.CreateMetricDescriptorRequest createTimeSeriesReqs []*monitoringpb.CreateTimeSeriesRequest createServiceTimeSeriesReqs []*monitoringpb.CreateTimeSeriesRequest RetryCount int mu sync.Mutex } func (m *MetricsTestServer) Shutdown() { // this will close mts.lis m.srv.GracefulStop() } // Pops out the CreateMetricDescriptorRequests which the test server has received so far. func (m *MetricsTestServer) CreateMetricDescriptorRequests() []*monitoringpb.CreateMetricDescriptorRequest { m.mu.Lock() defer m.mu.Unlock() reqs := m.createMetricDescriptorReqs m.createMetricDescriptorReqs = nil return reqs } // Pops out the UserAgent from the most recent CreateTimeSeriesRequests or CreateServiceTimeSeriesRequests. func (m *MetricsTestServer) UserAgent() string { m.mu.Lock() defer m.mu.Unlock() ua := m.userAgent m.userAgent = "" return ua } // Pops out the CreateTimeSeriesRequests which the test server has received so far. func (m *MetricsTestServer) CreateTimeSeriesRequests() []*monitoringpb.CreateTimeSeriesRequest { m.mu.Lock() defer m.mu.Unlock() reqs := m.createTimeSeriesReqs m.createTimeSeriesReqs = nil return reqs } // Pops out the CreateServiceTimeSeriesRequests which the test server has received so far. func (m *MetricsTestServer) CreateServiceTimeSeriesRequests() []*monitoringpb.CreateTimeSeriesRequest { m.mu.Lock() defer m.mu.Unlock() reqs := m.createServiceTimeSeriesReqs m.createServiceTimeSeriesReqs = nil return reqs } func (m *MetricsTestServer) appendCreateMetricDescriptorReq(ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest) { m.mu.Lock() defer m.mu.Unlock() m.createMetricDescriptorReqs = append(m.createMetricDescriptorReqs, req) } func (m *MetricsTestServer) appendCreateTimeSeriesReq(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) { m.mu.Lock() defer m.mu.Unlock() m.createTimeSeriesReqs = append(m.createTimeSeriesReqs, req) if md, ok := metadata.FromIncomingContext(ctx); ok { m.userAgent = strings.Join(md.Get("User-Agent"), ";") } } func (m *MetricsTestServer) appendCreateServiceTimeSeriesReq(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) { m.mu.Lock() defer m.mu.Unlock() m.createServiceTimeSeriesReqs = append(m.createServiceTimeSeriesReqs, req) if md, ok := metadata.FromIncomingContext(ctx); ok { m.userAgent = strings.Join(md.Get("User-Agent"), ";") } } func (m *MetricsTestServer) Serve() error { return m.srv.Serve(m.lis) } type fakeMetricServiceServer struct { monitoringpb.UnimplementedMetricServiceServer metricsTestServer *MetricsTestServer } // CreateTimeSeries simulates a call to GCM. // Failed calls can be simulated by putting error codes in the project name for the request, // such as "notfound", "unavailable", and "deadline_exceeded". // For WAL testing, unavailable and deadline exceeded calls will retry once, failing the first // time but succeeding on the second call (eg, network outage). func (f *fakeMetricServiceServer) CreateTimeSeries( ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, ) (*emptypb.Empty, error) { code := codes.OK if strings.Contains(req.Name, "notfound") { code = codes.NotFound } else if strings.Contains(req.Name, "unavailable") && f.metricsTestServer.RetryCount == 0 { f.metricsTestServer.RetryCount++ code = codes.Unavailable } else if strings.Contains(req.Name, "deadline_exceeded") && f.metricsTestServer.RetryCount == 0 { f.metricsTestServer.RetryCount++ code = codes.DeadlineExceeded } successPointCount := int32(len(req.TimeSeries)) if code == codes.NotFound || code == codes.Unavailable || code == codes.DeadlineExceeded { successPointCount = 0 } else { f.metricsTestServer.appendCreateTimeSeriesReq(ctx, req) } statusResp, _ := status.New(code, "").WithDetails( &monitoringpb.CreateTimeSeriesSummary{ TotalPointCount: int32(len(req.TimeSeries)), SuccessPointCount: successPointCount, }) return &emptypb.Empty{}, statusResp.Err() } func (f *fakeMetricServiceServer) CreateServiceTimeSeries( ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest, ) (*emptypb.Empty, error) { f.metricsTestServer.appendCreateServiceTimeSeriesReq(ctx, req) return &emptypb.Empty{}, nil } func (f *fakeMetricServiceServer) CreateMetricDescriptor( ctx context.Context, req *monitoringpb.CreateMetricDescriptorRequest, ) (*metricpb.MetricDescriptor, error) { f.metricsTestServer.appendCreateMetricDescriptorReq(ctx, req) return &metricpb.MetricDescriptor{}, nil } func NewMetricTestServer() (*MetricsTestServer, error) { srv := grpc.NewServer() lis, err := net.Listen("tcp", "localhost:0") if err != nil { return nil, err } testServer := &MetricsTestServer{ Endpoint: lis.Addr().String(), lis: lis, srv: srv, } monitoringpb.RegisterMetricServiceServer( srv, &fakeMetricServiceServer{metricsTestServer: testServer}, ) return testServer, nil }