bench/load/cancellation/workflow.go (269 lines of code) (raw):
// Copyright (c) 2017-2021 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package cancellation
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/pborman/uuid"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
"github.com/uber/cadence/bench/lib"
"github.com/uber/cadence/bench/load/common"
)
const (
// TestName is the test name for cancellation test
TestName = "cancellation"
// LauncherWorkflowName is the workflow name for launching cancellation load test
LauncherWorkflowName = "cancellation-load-test-workflow"
)
const (
sleepWorkflowName = "cancellation-sleep-workflow"
waitDurationInMilliSeconds = 75
waitDurationJitterInMilliSeconds = 50
minActivityTimeout = time.Minute
defaultDurationBeforeCancellation = 10 * time.Second
defaultWorkflowSleepDuration = 20 * time.Second
defaultDurationBeforeValidation = 30 * time.Second
)
type (
launcherActivityResult struct {
StartAvailability float64
CancelAvailability float64
}
startWorkflowProgress struct {
TotalStartWorkflowCall int
SucceededStartWorkflowCall int
WorkflowStarted int
NextStartID int
}
)
// RegisterLauncher registers workflows for launching cancellation load
func RegisterLauncher(w worker.Worker) {
w.RegisterWorkflowWithOptions(launcherWorkflow, workflow.RegisterOptions{Name: LauncherWorkflowName})
w.RegisterActivity(launcherActivity)
w.RegisterActivity(validationActivity)
}
// RegisterWorker registers workflows for cancellation test
func RegisterWorker(w worker.Worker) {
w.RegisterWorkflowWithOptions(sleepWorkflow, workflow.RegisterOptions{Name: sleepWorkflowName})
}
func launcherWorkflow(
ctx workflow.Context,
config lib.CancellationTestConfig,
) error {
workflowPerActivity := config.TotalLaunchCount / config.Concurrency
activityStartToCloseTimeout := time.Duration(workflowPerActivity/(1000/waitDurationInMilliSeconds)) * time.Second * common.DefaultMaxRetryCount
if activityStartToCloseTimeout < minActivityTimeout {
activityStartToCloseTimeout = minActivityTimeout
}
launcherActivityOptions := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: activityStartToCloseTimeout,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumAttempts: 10,
},
}
ctx = workflow.WithActivityOptions(ctx, launcherActivityOptions)
startTime := workflow.Now(ctx)
futures := make([]workflow.Future, 0, config.Concurrency)
for i := 0; i < config.Concurrency; i++ {
future := workflow.ExecuteActivity(ctx, launcherActivity, config)
futures = append(futures, future)
}
avgStartAvailability := 0.0
avgCancelAvailability := 0.0
for _, future := range futures {
var result launcherActivityResult
if err := future.Get(ctx, &result); err != nil {
return fmt.Errorf("launcherActivity failed: %v", err)
}
avgStartAvailability += result.StartAvailability
avgCancelAvailability += result.CancelAvailability
}
avgStartAvailability /= float64(config.Concurrency)
avgCancelAvailability /= float64(config.Concurrency)
// validate availability
if avgStartAvailability < common.DefaultAvailabilityThreshold {
return fmt.Errorf("startWorkflow availability too low, required: %v, actual: %v", common.DefaultAvailabilityThreshold, avgStartAvailability)
}
if avgCancelAvailability < common.DefaultAvailabilityThreshold {
return fmt.Errorf("cancelWorkflow availability too low, required: %v, actual: %v", common.DefaultAvailabilityThreshold, avgCancelAvailability)
}
// validate if there's stuck workflow using visibility records
// give the system some time to propagate ES records and wait for workflow that failed to cancel to finish
if err := workflow.Sleep(ctx, defaultDurationBeforeValidation); err != nil {
return fmt.Errorf("launcher workflow sleep failed: %v", err)
}
validationActivityOptions := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
RetryPolicy: &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2,
MaximumAttempts: 5,
NonRetriableErrorReasons: []string{common.ErrReasonValidationFailed},
},
}
ctx = workflow.WithActivityOptions(ctx, validationActivityOptions)
// move startTime backward by 10 secs to account for the time drift between worker and cadence hosts if any
return workflow.ExecuteActivity(ctx, validationActivity, config, startTime.Add(-time.Second*10).UnixNano()).Get(ctx, nil)
}
func launcherActivity(
ctx context.Context,
config lib.CancellationTestConfig,
) (launcherActivityResult, error) {
if config.ContextTimeoutInSeconds == 0 {
config.ContextTimeoutInSeconds = int(common.DefaultContextTimeout.Seconds())
}
startRPS := 1000 / waitDurationInMilliSeconds
workflowIDChan := make(chan string, startRPS*int(defaultDurationBeforeCancellation.Seconds()))
var startAvailability float64
go func() {
startAvailability = startWorkflow(ctx, &config, workflowIDChan)
}()
time.Sleep(defaultDurationBeforeCancellation)
cancelAvailability := cancelWorkflow(ctx, &config, workflowIDChan)
return launcherActivityResult{
StartAvailability: startAvailability,
CancelAvailability: cancelAvailability,
}, nil
}
func startWorkflow(
ctx context.Context,
config *lib.CancellationTestConfig,
workflowIDChan chan string,
) float64 {
defer close(workflowIDChan)
logger := activity.GetLogger(ctx)
var progress startWorkflowProgress
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &progress); err != nil {
// explicitly reset value to 0 if failed to get details, in case the implementation changed the value
logger.Error("Failed to get activity heartbeat details", zap.Error(err))
progress = startWorkflowProgress{
TotalStartWorkflowCall: 0,
SucceededStartWorkflowCall: 0,
WorkflowStarted: 0,
NextStartID: 0,
}
}
}
cc := ctx.Value(lib.CtxKeyCadenceClient).(lib.CadenceClient)
rc := ctx.Value(lib.CtxKeyRuntimeContext).(*lib.RuntimeContext)
numTaskList := rc.Bench.NumTaskLists
for i := progress.NextStartID; i < config.TotalLaunchCount/config.Concurrency; i++ {
select {
case <-ctx.Done():
// unable to start specified # of workflows in time.
// Workflow will receive timeout error, the value we return here is irrelevant.
logger.Error("Failed to start all workflows in time", zap.Any("progress", progress))
return 0
default:
}
startWorkflowOptions := client.StartWorkflowOptions{
ExecutionStartToCloseTimeout: 2 * defaultWorkflowSleepDuration,
ID: fmt.Sprintf("%s-%s", TestName, uuid.New()),
TaskList: common.GetTaskListName(rand.Intn(numTaskList)),
}
if err := common.RetryOp(func() error {
progress.TotalStartWorkflowCall++
startCtx, cancel := context.WithTimeout(ctx, time.Duration(config.ContextTimeoutInSeconds)*time.Second)
we, err := cc.StartWorkflow(startCtx, startWorkflowOptions, sleepWorkflowName, defaultWorkflowSleepDuration)
cancel()
if err == nil || cadence.IsWorkflowExecutionAlreadyStartedError(err) {
workflowIDChan <- we.ID
progress.SucceededStartWorkflowCall++
return nil
}
if common.IsServiceBusyError(err) {
// do not count service busy as failure
progress.SucceededStartWorkflowCall++
}
logger.Error("Failed to start workflow execution", zap.Error(err))
return err
}, nil); err == nil {
progress.WorkflowStarted++
}
// successfully started the workflow or gave up after several retries
progress.NextStartID++
activity.RecordHeartbeat(ctx, progress)
time.Sleep(time.Duration(waitDurationInMilliSeconds+rand.Intn(waitDurationJitterInMilliSeconds)) * time.Millisecond)
}
availability := float64(progress.SucceededStartWorkflowCall) / float64(progress.TotalStartWorkflowCall)
logger.Info("Completed start workflow", zap.Float64("availability", availability), zap.Int("workflow-started", progress.WorkflowStarted))
return availability
}
func cancelWorkflow(
ctx context.Context,
config *lib.CancellationTestConfig,
workflowIDChan chan string,
) float64 {
logger := activity.GetLogger(ctx)
totalCancelWorkflowCall := 0
succeededCancelWorkflowCall := 0
workflowCancelled := 0
cc := ctx.Value(lib.CtxKeyCadenceClient).(lib.CadenceClient)
for {
select {
case <-ctx.Done():
// unable to start specified # of workflows in time.
// Workflow will receive timeout error, the value we return here is irrelevant.
logger.Error("Failed to cancel all workflows in time")
return 0
case workflowID := <-workflowIDChan:
if workflowID == "" {
// channel has closed, all workflow cancelled
availability := float64(succeededCancelWorkflowCall) / float64(totalCancelWorkflowCall)
logger.Info("Completed cancel workflow", zap.Float64("availability", availability), zap.Int("workflow-cancelled", workflowCancelled))
return availability
}
if err := common.RetryOp(func() error {
totalCancelWorkflowCall++
cancelCtx, cancel := context.WithTimeout(ctx, time.Duration(config.ContextTimeoutInSeconds)*time.Second)
err := cc.CancelWorkflow(cancelCtx, workflowID, "")
cancel()
if err == nil || common.IsCancellationAlreadyRequestedError(err) || common.IsEntityNotExistsError(err) {
succeededCancelWorkflowCall++
return nil
}
if common.IsServiceBusyError(err) {
// do not count service busy as failure
succeededCancelWorkflowCall++
}
logger.Error("Failed to cancel workflow execution", zap.Error(err))
return err
}, nil); err == nil {
workflowCancelled++
}
time.Sleep(time.Duration(waitDurationInMilliSeconds+rand.Intn(waitDurationJitterInMilliSeconds)) * time.Millisecond)
}
}
}
func validationActivity(
ctx context.Context,
config *lib.CancellationTestConfig,
testStartTimeNanos int64,
) error {
cc := ctx.Value(lib.CtxKeyCadenceClient).(lib.CadenceClient)
domain := activity.GetInfo(ctx).WorkflowDomain
query := fmt.Sprintf("WorkflowType = '%s' and StartTime > %v", sleepWorkflowName, testStartTimeNanos)
request := &shared.CountWorkflowExecutionsRequest{
Domain: &domain,
Query: &query,
}
// 1. check if enough workflows are started
resp, err := cc.CountWorkflow(ctx, request)
if err != nil {
return err
}
totalLaunchCount := resp.GetCount()
if totalLaunchCount < int64(config.TotalLaunchCount) {
return cadence.NewCustomError(common.ErrReasonValidationFailed, fmt.Sprintf("Expected to start %v workflow, actual started: %v", config.TotalLaunchCount, totalLaunchCount))
}
// 2. check if all workflows are closed
query = fmt.Sprintf("WorkflowType = '%s' and StartTime > %v and CloseTime != missing", sleepWorkflowName, testStartTimeNanos)
request.Query = &query
resp, err = cc.CountWorkflow(ctx, request)
if err != nil {
return err
}
if resp.GetCount() != totalLaunchCount {
return cadence.NewCustomError(common.ErrReasonValidationFailed, fmt.Sprintf("Not all workflows are closed, started: %v, closed: %v", totalLaunchCount, resp.GetCount()))
}
// TODO: uncomment the following check after cancellation test is rewritten.
// currently if the launcherActivity failed in the middle (e.g. heartbeat timeout),
// all the workflowID in memory will be lost and those workflows can't be cancelled.
// 3. check if enough workflows are cancelled
// query = fmt.Sprintf("WorkflowType = '%s' and StartTime > %v and CloseStatus = 2", sleepWorkflowName, startTimeNanos)
// request.Query = &query
// resp, err = cc.CountWorkflow(ctx, request)
// if err != nil {
// return err
// }
// if resp.GetCount() < int64(float64(totalLaunchCount)*common.DefaultAvailabilityThreshold) {
// return cadence.NewCustomError(common.ErrReasonValidationFailed, fmt.Sprintf("Cancelled workflow count too low, started: %v, cancelled: %v", totalLaunchCount, resp.GetCount()))
// }
return nil
}
func sleepWorkflow(
ctx workflow.Context,
sleepDuration time.Duration,
) error {
return workflow.Sleep(ctx, sleepDuration)
}