e2etestrunner/testclient/testclient.go (94 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package testclient import ( "context" "fmt" "log" "strconv" "cloud.google.com/go/pubsub" "github.com/GoogleCloudPlatform/opentelemetry-operations-e2e-testing/e2etestrunner/setuptf" "google.golang.org/genproto/googleapis/rpc/code" ) const ( TestID string = "test_id" Scenario string = "scenario" StatusCode string = "status_code" Health string = "/health" ) type Request struct { // name of the scenario to run Scenario string TestID string Headers map[string]string } type Response struct { StatusCode code.Code Headers map[string]string } type Client struct { pubsubClient *pubsub.Client requestTopic *pubsub.Topic responseSubscription *pubsub.Subscription } func New(ctx context.Context, projectID string, pubsubInfo *setuptf.PubsubInfo) (*Client, error) { pubsub, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, err } client := &Client{ pubsubClient: pubsub, requestTopic: pubsub.Topic(pubsubInfo.RequestTopic.TopicName), responseSubscription: pubsub.Subscription(pubsubInfo.ResponseTopic.SubscriptionName), } // Disable buffering client.requestTopic.PublishSettings.CountThreshold = 1 return client, nil } func (c *Client) Request( ctx context.Context, request Request, ) (*Response, error) { attributes := map[string]string{TestID: request.TestID, "scenario": request.Scenario} for k, v := range request.Headers { attributes[k] = v } pubResult := c.requestTopic.Publish(ctx, &pubsub.Message{ Attributes: attributes, }) messageID, err := pubResult.Get(ctx) if err != nil { return nil, err } var ( res *Response resErr error ) cctx, cancel := context.WithCancel(ctx) err = c.responseSubscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) { if testID := message.Attributes[TestID]; testID == request.TestID { message.Ack() codeInt, err := strconv.Atoi(message.Attributes[StatusCode]) if err != nil { resErr = fmt.Errorf(`response pub/sub message invalid attribute %q: %v, message: %v`, StatusCode, err, message) } else { res = &Response{StatusCode: code.Code(codeInt), Headers: message.Attributes} } cancel() } else { message.Nack() } }) if err != nil { return nil, err } else if resErr != nil { return nil, resErr } else if res == nil { // Can happen if cctx times out return nil, fmt.Errorf( "sent message ID %v, but never received a response on subscription %v", messageID, c.responseSubscription.String(), ) } return res, nil } // Call in TestMain() to block until the test server is ready for requests. Uses // a *log.Logger because this runs before testing.T is available func (c *Client) WaitForHealth(ctx context.Context, logger *log.Logger) error { _, err := c.Request(ctx, Request{Scenario: Health}) return err }