continuous_load_testing/client.go (313 lines of code) (raw):

//go:generate sh grpc-proto-gen.sh package main import ( "context" "flag" "fmt" "io" "log" "strings" "sync" "time" empty "continuous_load_testing/proto/grpc/testing/empty" test "continuous_load_testing/proto/grpc/testing/test" "continuous_load_testing/proto/grpc/testing/messages" mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric" "go.opentelemetry.io/contrib/detectors/gcp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/grpclb" // Register the grpclb load balancing policy. _ "google.golang.org/grpc/balancer/rls" // Register the RLS load balancing policy. "google.golang.org/grpc/credentials/google" "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/stats/opentelemetry" _ "google.golang.org/grpc/xds/googledirectpath" // Register xDS resolver required for c2p directpath. ) const ( monitoredResourceName = "k8s_container" metricPrefix = "directpathgrpctesting-pa.googleapis.com/client/" ) var ( directPathServerAddr = "google-c2p:///directpathgrpctesting-pa.googleapis.com" cloudPathServerAddr = "dns:///directpathgrpctesting-pa.googleapis.com" concurrency = flag.Int("concurrency", 1, "Number of concurrent workers (default 1)") numOfRequests = flag.Int("num_of_requests", 10, "Total number of rpc requests to make (default 10)") disableDirectPath = flag.Bool("disable_directpath", false, "If true, use CloudPath instead of DirectPath (default is false)") methodsInput = flag.String("methods", "", "Comma-separated list of methods to use (e.g., EmptyCall, UnaryCall)") methods = map[string]bool{ "EmptyCall": false, "UnaryCall": false, "StreamingInputCall": false, "StreamingOutputCall": false, "FullDuplexCall": false, "HalfDuplexCall": false, } ) func createExporter() (metric.Exporter, error) { exporter, err := mexporter.New( mexporter.WithMetricDescriptorTypeFormatter(metricFormatter), mexporter.WithCreateServiceTimeSeries(), ) if err != nil { return nil, fmt.Errorf("creating metrics exporter: %w", err) } log.Println("exporter done") return exporter, nil } // newGRPCLoadTestMonitoredResource initializes a new resource for the gRPC load test client. func newGrpcLoadTestMonitoredResource(ctx context.Context) (*resource.Resource, error) { res, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()), resource.WithTelemetrySDK(), resource.WithFromEnv(), resource.WithAttributes( attribute.String("gcp.resource_type", monitoredResourceName), ), ) if err != nil { return nil, fmt.Errorf("creating monitored resource: %w", err) } return res, nil } // setupOpenTelemetry sets up OpenTelemetry for the gRPC load test client, initializing the exporter and provider. func setupOpenTelemetry() ([]grpc.DialOption, error) { ctx := context.Background() var exporter metric.Exporter res, err := newGrpcLoadTestMonitoredResource(ctx) if err != nil { log.Fatalf("Failed to create monitored resource: %v", err) } exporter, err = createExporter() if err != nil { log.Fatalf("Failed to create exporter: %v", err) } log.Println("Created exporter.") meterOpts := []metric.Option{ metric.WithResource(res), metric.WithReader(metric.NewPeriodicReader(exporter, metric.WithInterval(90*time.Second))), } provider := metric.NewMeterProvider(meterOpts...) log.Println("provider done.") mo := opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: stats.NewMetrics( "grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights", "grpc.lb.rls.cache_entries", "grpc.lb.rls.cache_size", "grpc.lb.rls.default_target_picks", "grpc.lb.rls.target_picks", "grpc.lb.rls.failed_picks", "grpc.xds_client.connected", "grpc.xds_client.server_failure", "grpc.xds_client.resource_updates_valid", "grpc.xds_client.resource_updates_invalid", "grpc.xds_client.resources", "grpc.client.attempt.sent_total_compressed_message_size", "grpc.client.attempt.rcvd_total_compressed_message_size", "grpc.client.attempt.started", "grpc.client.attempt.duration", "grpc.client.call.duration", ), OptionalLabels: []string{"grpc.lb.locality"}, } opts := []grpc.DialOption{ opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo}), grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{}), } return opts, nil } func metricFormatter(m metricdata.Metrics) string { return metricPrefix + strings.ReplaceAll(string(m.Name), ".", "/") } // executeMethod executes the RPC call for a specific method with concurrency. func executeMethod(methodName string, methodFunc func(context.Context, test.TestServiceClient) error, stub test.TestServiceClient) { var wg sync.WaitGroup log.Printf("Concurrency level: %d", *concurrency) for i := 0; i < *concurrency; i++ { wg.Add(1) go func(i int) { defer wg.Done() ctx := context.Background() log.Printf("Starting concurrency goroutine #%d for method: %s", i, methodName) for { err := methodFunc(ctx, stub) if err != nil { log.Printf("Error executing %s #%d: %v", methodName, i, err) } } }(i) } } func ExecuteEmptyCalls(ctx context.Context, tc test.TestServiceClient) error { _, err := tc.EmptyCall(ctx, &empty.Empty{}) if err != nil { return fmt.Errorf("EmptyCall RPC failed: %v", err) } return nil } func ExecuteUnaryCalls(ctx context.Context, tc test.TestServiceClient) error { req := &messages.SimpleRequest{} _, err := tc.UnaryCall(ctx, req) if err != nil { return fmt.Errorf("UnaryCall RPC failed: %v", err) } return nil } func ExecuteStreamingInputCalls(ctx context.Context, tc test.TestServiceClient) error { stream, err := tc.StreamingInputCall(ctx) if err != nil { return fmt.Errorf("%v.StreamingInputCall(_) = _, %v", tc, err) } for i := 0; i < *numOfRequests; i++ { req := &messages.StreamingInputCallRequest{} if err := stream.Send(req); err != nil { return fmt.Errorf("%v has error %v while sending %v", stream, err, req) } } _, err = stream.CloseAndRecv() if err != nil { return fmt.Errorf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } return nil } func ExecuteStreamingOutputCalls(ctx context.Context, tc test.TestServiceClient) error { req := &messages.StreamingOutputCallRequest{} stream, err := tc.StreamingOutputCall(ctx, req) if err != nil { return fmt.Errorf("%v.StreamingOutputCall(_) = _, %v", tc, err) } var rpcStatus error var index int for { _, err := stream.Recv() if err != nil { rpcStatus = err break } index++ } if rpcStatus != io.EOF { return fmt.Errorf("failed to finish the server streaming rpc: %v", rpcStatus) } return nil } func ExecuteFullDuplexCalls(ctx context.Context, tc test.TestServiceClient) error { stream, err := tc.FullDuplexCall(ctx) if err != nil { return fmt.Errorf("%v.FullDuplexCall(_) = _, %v", tc, err) } for i := 0; i < *numOfRequests; i++ { req := &messages.StreamingOutputCallRequest{} if err = stream.Send(req); err != nil { return fmt.Errorf("%v has error %v while sending %v", stream, err, req) } _, err := stream.Recv() if err != nil { return fmt.Errorf("%v.Recv() = %v", stream, err) } } if err = stream.CloseSend(); err != nil { return fmt.Errorf("error closing send stream: %v", err) } if _, err = stream.Recv(); err != io.EOF { return fmt.Errorf("stream didn't complete successfully: %v", err) } return nil } func ExecuteHalfDuplexCalls(ctx context.Context, tc test.TestServiceClient) error { stream, err := tc.HalfDuplexCall(ctx) if err != nil { return fmt.Errorf("%v.HalfDuplexCall(_) = _, %v", tc, err) } for i := 0; i < *numOfRequests; i++ { req := &messages.StreamingOutputCallRequest{} if err = stream.Send(req); err != nil { return fmt.Errorf("%v has error %v while sending %v", stream, err, req) } } if err = stream.CloseSend(); err != nil { return fmt.Errorf("%v.CloseSend() got %v, want %v", stream, err, nil) } for i := 0; i < *numOfRequests; i++ { _, err := stream.Recv() if err != nil { return fmt.Errorf("%v.Recv() = %v", stream, err) } } if _, err = stream.Recv(); err != io.EOF { return fmt.Errorf("%v failed to complete the HalfDuplexCalls: %v", stream, err) } return nil } func main() { log.Println("DirectPath Continuous Load Testing Client Started - test15.") log.Printf("Concurrency level: %d", *concurrency) flag.Parse() var serverAddr string if *disableDirectPath { serverAddr = cloudPathServerAddr } else { serverAddr = directPathServerAddr } log.Printf("serverAddr: %s", serverAddr) if *methodsInput != "" { log.Printf("Methods input received: %s", *methodsInput) methodList := strings.Split(*methodsInput, ",") for _, method := range methodList { method = strings.TrimSpace(method) if _, exists := methods[method]; !exists { log.Fatalf("Invalid method specified: %s. Available methods are: EmptyCall, UnaryCall, StreamingInputCall, StreamingOutputCall, FullDuplexCall, HalfDuplexCall", method) } methods[method] = true log.Printf("Enabled method: %s", method) } } else { methods["EmptyCall"] = true log.Println("No methods input received.default EmptyCall") } log.Println("Setting up OpenTelemetry...") opts, err := setupOpenTelemetry() if err != nil { log.Fatalf("Failed to set up OpenTelemetry: %v", err) } log.Println("OpenTelemetry setup completed.") opts = append(opts, grpc.WithCredentialsBundle(google.NewDefaultCredentials())) log.Println("Attempting to create gRPC connection...") conn, err := grpc.NewClient(serverAddr, opts...) if err != nil { log.Fatalf("Failed to connect to gRPC server %v", err) } log.Println("Successfully connected to gRPC server") defer conn.Close() stub := test.NewTestServiceClient(conn) log.Println("gRPC client stub created.") if methods["EmptyCall"] { go executeMethod("EmptyCall", ExecuteEmptyCalls, stub) log.Println("EmptyCall method started in background") } if methods["UnaryCall"] { go executeMethod("UnaryCall", ExecuteUnaryCalls, stub) log.Println("UnaryCall method started in background") } if methods["StreamingInputCall"] { go executeMethod("StreamingInputCall", ExecuteStreamingInputCalls, stub) log.Println("StreamingInputCall method started in background") } if methods["StreamingOutputCall"] { go executeMethod("StreamingOutputCall", ExecuteStreamingOutputCalls, stub) log.Println("StreamingOutputCall method started in background") } if methods["FullDuplexCall"] { go executeMethod("FullDuplexCall", ExecuteFullDuplexCalls, stub) log.Println("FullDuplexCall method started in background") } if methods["HalfDuplexCall"] { go executeMethod("HalfDuplexCall", ExecuteHalfDuplexCalls, stub) log.Println("HalfDuplexCall method started in background") } forever := make(chan struct{}) <-forever log.Println("All test cases completed.") }