func()

in e2etestrunner/testclient/testclient.go [68:118]


func (c *Client) Request(
	ctx context.Context,
	request Request,
) (*Response, error) {
	attributes := map[string]string{TestID: request.TestID, "scenario": request.Scenario}
	for k, v := range request.Headers {
		attributes[k] = v
	}
	pubResult := c.requestTopic.Publish(ctx, &pubsub.Message{
		Attributes: attributes,
	})
	messageID, err := pubResult.Get(ctx)
	if err != nil {
		return nil, err
	}

	var (
		res    *Response
		resErr error
	)
	cctx, cancel := context.WithCancel(ctx)
	err = c.responseSubscription.Receive(cctx, func(ctx context.Context, message *pubsub.Message) {
		if testID := message.Attributes[TestID]; testID == request.TestID {
			message.Ack()
			codeInt, err := strconv.Atoi(message.Attributes[StatusCode])
			if err != nil {
				resErr = fmt.Errorf(`response pub/sub message invalid attribute %q: %v, message: %v`, StatusCode, err, message)
			} else {
				res = &Response{StatusCode: code.Code(codeInt), Headers: message.Attributes}
			}
			cancel()
		} else {
			message.Nack()
		}
	})

	if err != nil {
		return nil, err
	} else if resErr != nil {
		return nil, resErr
	} else if res == nil {
		// Can happen if cctx times out
		return nil, fmt.Errorf(
			"sent message ID %v, but never received a response on subscription %v",
			messageID,
			c.responseSubscription.String(),
		)
	}

	return res, nil
}