in e2etestrunner/testclient/testclient.go [68:118]
func (c *Client) Request(
ctx context.Context,
request Request,
) (*Response, error) {
attributes := map[string]string{TestID: request.TestID, "scenario": request.Scenario}
for k, v := range request.Headers {
attributes[k] = v
}
pubResult := c.requestTopic.Publish(ctx, &pubsub.Message{
Attributes: attributes,
})
messageID, err := pubResult.Get(ctx)
if err != nil {
return nil, err
}
var (
res *Response
resErr error
)
cctx, cancel := context.WithCancel(ctx)
err = c.responseSubscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) {
if testID := message.Attributes[TestID]; testID == request.TestID {
message.Ack()
codeInt, err := strconv.Atoi(message.Attributes[StatusCode])
if err != nil {
resErr = fmt.Errorf(`response pub/sub message invalid attribute %q: %v, message: %v`, StatusCode, err, message)
} else {
res = &Response{StatusCode: code.Code(codeInt), Headers: message.Attributes}
}
cancel()
} else {
message.Nack()
}
})
if err != nil {
return nil, err
} else if resErr != nil {
return nil, resErr
} else if res == nil {
// Can happen if cctx times out
return nil, fmt.Errorf(
"sent message ID %v, but never received a response on subscription %v",
messageID,
c.responseSubscription.String(),
)
}
return res, nil
}