client/concurrency.go (134 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"log"
"sync"
"time"
"github.com/GoogleCloudPlatform/functions-framework-conformance/events"
)
func timeExecution(fn func() error) (time.Duration, error) {
start := time.Now()
err := fn()
return time.Since(start), err
}
// validateConcurrency validates a server can handle concurrent requests by
// valdating that the response time for a single request does not increase
// linearly with n concurrent requests, given a function that:
// 1. Is not CPU-bound (e.g. sleeps)
// 2. Executes for at least 1s to ensure non-trivial measurement differences
func validateConcurrency(url string, functionType string) error {
log.Printf("%s validation with concurrent requests...", functionType)
var sendFn func() error
switch functionType {
case "http", "typed":
// Arbitrary JSON payload for compatibility with 'http' and typed JSON tests
sendFn = func() error {
_, err := sendHTTP(url, []byte(`{"data": "hello"}`))
return err
}
case "cloudevent":
// Arbitrary payload that conforms to CloudEvent schema
sendFn = func() error {
return send(url, events.CloudEvent, []byte(`{
"specversion": "1.0",
"type": "google.firebase.auth.user.v1.created",
"source": "//firebaseauth.googleapis.com/projects/my-project-id",
"subject": "users/UUpby3s4spZre6kHsgVSPetzQ8l2",
"id": "aaaaaa-1111-bbbb-2222-cccccccccccc",
"time": "2020-09-29T11:32:00.123Z",
"datacontenttype": "application/json",
"data": {
"email": "test@nowhere.com",
"metadata": {
"createTime": "2020-05-26T10:42:27Z",
"lastSignInTime": "2020-10-24T11:00:00Z"
},
"providerData": [
{
"email": "test@nowhere.com",
"providerId": "password",
"uid": "test@nowhere.com"
}
],
"uid": "UUpby3s4spZre6kHsgVSPetzQ8l2"
}
}`))
}
case "legacyevent":
// Arbitrary payload that conforms to Background event schema
sendFn = func() error {
return send(url, events.LegacyEvent, []byte(`{
"data": {
"email": "test@nowhere.com",
"metadata": {
"createdAt": "2020-05-26T10:42:27Z",
"lastSignedInAt": "2020-10-24T11:00:00Z"
},
"providerData": [
{
"email": "test@nowhere.com",
"providerId": "password",
"uid": "test@nowhere.com"
}
],
"uid": "UUpby3s4spZre6kHsgVSPetzQ8l2"
},
"eventId": "aaaaaa-1111-bbbb-2222-cccccccccccc",
"eventType": "providers/firebase.auth/eventTypes/user.create",
"notSupported": {
},
"resource": "projects/my-project-id",
"timestamp": "2020-09-29T11:32:00.123Z"
}`))
}
default:
return fmt.Errorf("expected type to be one of 'http', 'cloudevent', or 'legacyevent', got %s", functionType)
}
if err := sendConcurrentRequests(sendFn); err != nil {
return err
}
log.Printf("Concurrency validation passed!")
return nil
}
func sendConcurrentRequests(sendFn func() error) error {
// Get a benchmark for the time it takes for a single request
singleReqTime, singleReqErr := timeExecution(func() error {
return sendFn()
})
if singleReqErr != nil {
return fmt.Errorf("concurrent validation unable to send single request to benchmark response time: %v", singleReqErr)
}
minWait := 1 * time.Second
if singleReqTime < minWait {
return fmt.Errorf("concurrent validation requires a function that waits at least %s before responding, function responded in %s", minWait, singleReqTime)
}
log.Printf("Single request response time benchmarked, took %s for 1 request", singleReqTime)
// Get a benchmark for the time it takes for concurrent requests
const numConReqs = 10
log.Printf("Starting %d concurrent workers to send requests", numConReqs)
type workerResponse struct {
id int
err error
}
var wg sync.WaitGroup
respCh := make(chan workerResponse, numConReqs)
conReqTime, _ := timeExecution(func() error {
for i := 0; i < numConReqs; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
err := sendFn()
respCh <- workerResponse{id: id, err: err}
}(i)
}
wg.Wait()
return nil
})
maybeErrMessage := ""
for i := 0; i < numConReqs; i++ {
resp := <-respCh
if resp.err != nil {
maybeErrMessage += fmt.Sprintf("error #%d: %v\n", i, resp.err)
} else {
log.Printf("Worker #%d done", resp.id)
}
}
if maybeErrMessage != "" {
return fmt.Errorf("at least one concurrent request failed:\n%s", maybeErrMessage)
}
// Validate that the concurrent requests were handled faster than if all
// the requests were handled serially, using the single request time
// as a benchmark. Some buffer is provided by doubling the single request time.
if conReqTime > 2*singleReqTime {
return fmt.Errorf("function took too long to complete %d concurrent requests. %d concurrent request time: %s, single request time: %s", numConReqs, numConReqs, conReqTime, singleReqTime)
}
log.Printf("Concurrent request response time benchmarked, took %s for %d requests", conReqTime, numConReqs)
return nil
}