bench/load/cron/workflow.go (278 lines of code) (raw):

// Copyright (c) 2017-2021 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package cron import ( "context" "fmt" "math/rand" "strconv" "time" "go.uber.org/cadence" "go.uber.org/cadence/activity" "go.uber.org/cadence/client" "go.uber.org/cadence/worker" "go.uber.org/cadence/workflow" "go.uber.org/zap" "github.com/uber/cadence/bench/lib" "github.com/uber/cadence/bench/load/basic" "github.com/uber/cadence/bench/load/cancellation" "github.com/uber/cadence/bench/load/common" "github.com/uber/cadence/bench/load/concurrentexec" "github.com/uber/cadence/bench/load/signal" "github.com/uber/cadence/bench/load/timer" ) const ( // TestName is the test name for cron bench test TestName = "cron" ) const ( cronWorkflowName = "cron-test-workflow" cronLauncherWorkflowName = "cron-launcher-workflow" queryTypeTestResults = "test-results" testResultSignalName = "test-result" testPassedBoolSearchAttribute = "Passed" ) type ( workflowExecution struct { WorkflowID string RunID string } testResult struct { Name string Description string TestStatus string Details string } ) const ( testStatusPassed = "Passed" testStatusFailed = "Failed" ) // RegisterLauncher registers workflows for cron load launching func RegisterLauncher(w worker.Worker) { w.RegisterWorkflowWithOptions(cronWorkflow, workflow.RegisterOptions{Name: cronWorkflowName}) w.RegisterWorkflowWithOptions(launcherWorkflow, workflow.RegisterOptions{Name: cronLauncherWorkflowName}) w.RegisterActivity(launcherActivity) w.RegisterActivity(signalResultActivity) } func cronWorkflow( ctx workflow.Context, config lib.CronTestConfig, ) error { now := workflow.Now(ctx).UnixNano() profile, err := lib.BeginWorkflow(ctx, cronWorkflowName, now) if err != nil { return err } workflowTimeoutInSeconds := workflow.GetInfo(ctx).ExecutionStartToCloseTimeoutSeconds ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, RetryPolicy: &cadence.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2, ExpirationInterval: time.Duration(workflowTimeoutInSeconds) * time.Second, // retry the activity until workflow timeout }, } ctx = workflow.WithActivityOptions(ctx, ao) totalTests := 0 futures := make([]workflow.Future, 0, len(config.TestSuites)) for _, testSuite := range config.TestSuites { totalTests += len(testSuite.Configs) testSuiteName := testSuite.Name future := workflow.ExecuteActivity(ctx, launcherActivity, testSuite.Domain, testSuiteName, testSuite.Configs, workflowTimeoutInSeconds) futures = append(futures, future) } for _, future := range futures { if err := future.Get(ctx, nil); err != nil { return profile.End(err) } } testResults := make([]testResult, 0, totalTests) _ = workflow.SetQueryHandler(ctx, queryTypeTestResults, func() ([]testResult, error) { return testResults, nil }) testPassed := true testResultCh := workflow.GetSignalChannel(ctx, testResultSignalName) for len(testResults) != totalTests { var result testResult testResultCh.Receive(ctx, &result) testResults = append(testResults, result) if result.TestStatus == testStatusFailed { testPassed = false } } err = workflow.UpsertSearchAttributes(ctx, map[string]interface{}{testPassedBoolSearchAttribute: testPassed}) return profile.End(err) } func launcherActivity( ctx context.Context, domain string, testSuiteName string, testConfigs []lib.AggregatedTestConfig, timeoutInSeconds int32, ) error { logger := activity.GetLogger(ctx) rc := ctx.Value(lib.CtxKeyRuntimeContext).(*lib.RuntimeContext) cc, err := lib.NewCadenceClientForDomain(rc, domain) if err != nil { logger.Error("Failed to create cadence client", zap.String("domain", domain)) return err } workflowOptions := client.StartWorkflowOptions{ ID: "cron-launcher-" + testSuiteName, TaskList: common.GetTaskListName(0), // default to use the tasklist with ID 0 ExecutionStartToCloseTimeout: time.Duration(timeoutInSeconds) * time.Second, DecisionTaskStartToCloseTimeout: time.Minute, WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, } activityInfo := activity.GetInfo(ctx) _ = common.RetryOp(func() error { _, err = cc.StartWorkflow( ctx, workflowOptions, cronLauncherWorkflowName, activityInfo.WorkflowDomain, workflowExecution{ WorkflowID: activityInfo.WorkflowExecution.ID, RunID: activityInfo.WorkflowExecution.RunID, }, testSuiteName, testConfigs, time.Now().UnixNano(), ) if err == nil || cadence.IsWorkflowExecutionAlreadyStartedError(err) { // TODO: it's possible that the launcher workflow is started by another cron logger.Info("Started test suite", zap.String("test-suite", testSuiteName)) return nil } logger.Error("Failed to start test suite", zap.String("test-suite", testSuiteName)) return err }, nil) return err } func launcherWorkflow( ctx workflow.Context, parentDomain string, parentExecution workflowExecution, testSuiteName string, testConfigs []lib.AggregatedTestConfig, scheduledTimeNanos int64, ) ([]testResult, error) { profile, err := lib.BeginWorkflow(ctx, cronLauncherWorkflowName, scheduledTimeNanos) if err != nil { return nil, err } testTimeout := workflow.GetInfo(ctx).ExecutionStartToCloseTimeoutSeconds ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, RetryPolicy: &cadence.RetryPolicy{ InitialInterval: time.Second, BackoffCoefficient: 2, ExpirationInterval: time.Duration(testTimeout) * time.Second, // retry the signal activity until workflow timeout NonRetriableErrorReasons: []string{common.ErrReasonWorkflowNotExist}, }, } ctx = workflow.WithActivityOptions(ctx, ao) numTests := len(testConfigs) testResults := make([]testResult, 0, numTests) // run all tests in random order var testOrder []int err = workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return rand.Perm(len(testConfigs)) }).Get(&testOrder) if err != nil { for idx := range testConfigs { result := testResult{ Name: testConfigs[idx].Name, TestStatus: testStatusFailed, Details: fmt.Sprintf("Failed to generate randomized test order: %v", err.Error()), } testResults = append(testResults, result) if err := workflow.ExecuteActivity(ctx, signalResultActivity, parentDomain, parentExecution, result).Get(ctx, nil); err != nil { workflow.GetLogger(ctx).Error("Failed to signal test result", zap.Error(err)) return nil, profile.End(err) } } return testResults, nil } for _, idx := range testOrder { testConfig := testConfigs[idx] cwo := workflow.ChildWorkflowOptions{ WorkflowID: testSuiteName + "-" + testConfig.Name + "-" + strconv.Itoa(idx), TaskList: common.GetTaskListName(0), // default to use the tasklist with ID 0 ExecutionStartToCloseTimeout: time.Duration(testConfig.TimeoutInSeconds) * time.Second, TaskStartToCloseTimeout: time.Minute, ParentClosePolicy: client.ParentClosePolicyTerminate, WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate, } childCtx := workflow.WithChildOptions(ctx, cwo) var childFuture workflow.Future switch testConfig.Name { case basic.TestName: childFuture = workflow.ExecuteChildWorkflow(childCtx, basic.LauncherWorkflowName, *testConfig.Basic) case signal.TestName: childFuture = workflow.ExecuteChildWorkflow(childCtx, signal.LauncherWorkflowName, *testConfig.Signal) case timer.TestName: childFuture = workflow.ExecuteChildWorkflow(childCtx, timer.LauncherWorkflowName, *testConfig.Timer) case concurrentexec.TestName: childFuture = workflow.ExecuteChildWorkflow(childCtx, concurrentexec.LauncherWorkflowName, *testConfig.ConcurrentExec) case cancellation.TestName: childFuture = workflow.ExecuteChildWorkflow(childCtx, cancellation.LauncherWorkflowName, *testConfig.Cancellation) default: workflow.GetLogger(ctx).Error("Unknown test name", zap.String("test-name", testConfig.Name)) } result := testResult{ Name: testSuiteName + "::" + testConfig.Name, Description: testConfig.Description, } if childFuture == nil { result.TestStatus = testStatusFailed result.Details = "Unknown test" } else if err := childFuture.Get(childCtx, nil); err != nil { result.TestStatus = testStatusFailed result.Details = err.Error() if customErr, ok := err.(*cadence.CustomError); ok { var detailStr string if err := customErr.Details(&detailStr); err == nil { result.Details += ": " + detailStr } } } else { result.TestStatus = testStatusPassed } testResults = append(testResults, result) if err := workflow.ExecuteActivity(ctx, signalResultActivity, parentDomain, parentExecution, result).Get(ctx, nil); err != nil { workflow.GetLogger(ctx).Error("Failed to signal test result", zap.String("test-name", testConfig.Name), zap.Error(err)) return nil, profile.End(err) } } return testResults, profile.End(nil) } func signalResultActivity( ctx context.Context, targetDomain string, targetWorkflowExecution workflowExecution, result testResult, ) error { logger := activity.GetLogger(ctx) rc := ctx.Value(lib.CtxKeyRuntimeContext).(*lib.RuntimeContext) cc, err := lib.NewCadenceClientForDomain(rc, targetDomain) if err != nil { logger.Error("Failed to create cadence client", zap.String("domain", targetDomain)) return err } if err = common.RetryOp(func() error { return cc.SignalWorkflow( ctx, targetWorkflowExecution.WorkflowID, targetWorkflowExecution.RunID, testResultSignalName, result, ) }, common.IsNonRetryableError); err != nil { logger.Error("Failed to signal test result back to workflow", zap.String("domain", targetDomain), zap.String("workflowID", targetWorkflowExecution.WorkflowID), zap.String("workflowID", targetWorkflowExecution.RunID), zap.Error(err), ) return cadence.NewCustomError(common.ErrReasonWorkflowNotExist, err.Error()) } return nil }