bench/load/signal/workflow.go (444 lines of code) (raw):
// Copyright (c) 2017-2021 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package signal
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"time"
"go.uber.org/cadence"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/client"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
"golang.org/x/time/rate"
"github.com/uber/cadence/bench/lib"
"github.com/uber/cadence/bench/load/common"
c "github.com/uber/cadence/common"
)
const (
// TestName is the test name
TestName = "signal"
// LauncherWorkflowName is the workflow name for launching load test
LauncherWorkflowName = "signal-load-test-workflow"
loadTestActivityName = "loadTestActivity"
verifyResultActivityName = "verifyResultActivity"
processSignalWorkflowName = "processSignalWorkflow"
stressTestSignalName = "stressSignalName"
processSignalActivityName = "processSignalActivity"
exitSignalNumber = -1
failedSignalWorkflowQuery = "WorkflowType='%v' and CloseStatus=1 and StartTime > %v and CloseTime < %v"
)
var (
maxPageSize = int32(10)
)
// RegisterLauncher registers workflows and activities for load launching
func RegisterLauncher(w worker.Worker) {
w.RegisterWorkflowWithOptions(loadTestWorkflow, workflow.RegisterOptions{Name: LauncherWorkflowName})
w.RegisterActivityWithOptions(loadTestActivity, activity.RegisterOptions{Name: loadTestActivityName})
w.RegisterActivityWithOptions(verifyResultActivity, activity.RegisterOptions{Name: verifyResultActivityName})
}
// RegisterWorker registers workflows and activities for sync api load
func RegisterWorker(w worker.Worker) {
w.RegisterWorkflowWithOptions(processSignalWorkflow, workflow.RegisterOptions{Name: processSignalWorkflowName})
w.RegisterActivityWithOptions(processSignalActivity, activity.RegisterOptions{Name: processSignalActivityName})
}
type (
loadTestActivityParams struct {
WorkflowExecutionTimeoutInSeconds int
DecisionTaskTimeoutInSeconds int
CampaignCount int
ActionRate float64
FailureRate float64
StartingWorkflowID int
BatchWorkflowCount int
SignalCount int
SignalDataSize int
RateLimit int
SignalCountBeforeContinueAsNew int
EnableRollingWindow bool
MaxSignalDelayInSeconds int
MaxSignalDelayCount int
}
signalActivityResult struct {
SucceedCount int
FailureCount int
}
signalEvent struct {
SignalNumber int
Data []byte
Timestamp int64
}
verifyActivityParams struct {
FailedWorkflowCount int64
WorkflowStartTime int64
}
)
// loadTestWorkflow sends signals to the stress workflow
func loadTestWorkflow(ctx workflow.Context, params lib.SignalTestConfig) error {
info := workflow.GetInfo(ctx)
startTime := workflow.Now(ctx)
expiration := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
profile, err := lib.BeginWorkflow(ctx, LauncherWorkflowName, startTime.UnixNano())
if err != nil {
return err
}
retryPolicy := &cadence.RetryPolicy{
InitialInterval: time.Second * 5,
BackoffCoefficient: 1, // always backoff 5s
MaximumInterval: time.Second * 5,
ExpirationInterval: expiration,
MaximumAttempts: 2000,
NonRetriableErrorReasons: []string{common.ErrReasonValidationFailed},
}
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: expiration,
StartToCloseTimeout: expiration,
HeartbeatTimeout: time.Second * 30,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
batchSize := params.LoadTestWorkflowCount / params.LoaderCount
doneCh := workflow.NewChannel(ctx)
for i := 0; i < params.LoaderCount; i++ {
loaderParams := loadTestActivityParams{
StartingWorkflowID: i * batchSize,
BatchWorkflowCount: batchSize,
WorkflowExecutionTimeoutInSeconds: params.WorkflowExecutionTimeoutInSeconds,
DecisionTaskTimeoutInSeconds: params.DecisionTaskTimeoutInSeconds,
CampaignCount: params.CampaignCount,
ActionRate: params.ActionRate,
FailureRate: params.FailureRate,
RateLimit: params.RateLimit,
SignalCount: params.SignalCount,
SignalDataSize: params.SignalDataSize,
SignalCountBeforeContinueAsNew: params.SignalBeforeContinueAsNew,
EnableRollingWindow: params.EnableRollingWindow,
MaxSignalDelayInSeconds: params.MaxSignalDelayInSeconds,
MaxSignalDelayCount: params.MaxSignalDelayCount,
}
workflow.Go(ctx, func(ctx workflow.Context) {
var activityResult signalActivityResult
err := workflow.ExecuteActivity(ctx, loadTestActivityName, loaderParams).Get(ctx, &activityResult)
if err != nil {
workflow.GetLogger(ctx).Info("signal LoadTestActivity failed", zap.Error(err))
} else {
workflow.GetLogger(ctx).Info("signal LoadTestActivity completed")
}
doneCh.Send(ctx, "done")
})
}
for i := 0; i < params.LoaderCount; i++ {
doneCh.Receive(ctx, nil)
}
if err := workflow.Sleep(ctx, time.Minute*5); err != nil {
profile.End(err)
return err
}
actInput := verifyActivityParams{
FailedWorkflowCount: int64(float64(params.LoadTestWorkflowCount) * params.FailureThreshold),
WorkflowStartTime: startTime.UnixNano(),
}
err = workflow.ExecuteActivity(ctx, verifyResultActivityName, actInput).Get(ctx, nil)
return profile.End(err)
}
func loadTestActivity(ctx context.Context, params loadTestActivityParams) (signalActivityResult, error) {
info := activity.GetInfo(ctx)
logger := activity.GetLogger(ctx)
cc := ctx.Value(lib.CtxKeyCadenceClient).(lib.CadenceClient)
rc := ctx.Value(lib.CtxKeyRuntimeContext).(*lib.RuntimeContext)
numTaskList := rc.Bench.NumTaskLists
loaderID := info.WorkflowExecution.ID
limiter := rate.NewLimiter(rate.Limit(params.RateLimit), 1)
workflowOptions := client.StartWorkflowOptions{
ExecutionStartToCloseTimeout: time.Second * time.Duration(params.WorkflowExecutionTimeoutInSeconds),
DecisionTaskStartToCloseTimeout: time.Second * time.Duration(params.DecisionTaskTimeoutInSeconds),
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
}
wfParams := lib.ProcessSignalWorkflowConfig{
CampaignCount: params.CampaignCount,
ActionRate: params.ActionRate,
FailureRate: params.FailureRate,
SignalBeforeContinueAsNew: params.SignalCountBeforeContinueAsNew,
MaxSignalDelayInSeconds: params.MaxSignalDelayInSeconds,
MaxSignalDelayCount: params.MaxSignalDelayCount,
}
data := make([]byte, params.SignalDataSize)
for i := 0; i < len(data); i++ {
data[i] = 'a'
}
totalSignalCount := params.SignalCount * params.BatchWorkflowCount
succeedCount, failedCount := 0, 0
wfIDs := make(map[string]struct{})
batchStartID := 0
if activity.HasHeartbeatDetails(ctx) {
var finishedID int
if err := activity.GetHeartbeatDetails(ctx, &finishedID); err == nil {
batchStartID = finishedID + 1
logger.Info("recover from failed attempt", zap.Int("FinishedID", finishedID))
}
}
logger.Info("start sending signals",
zap.Int32("Attempt", info.Attempt),
zap.Int("StartSignalCount", batchStartID),
zap.Int("TotalSignalCount", totalSignalCount),
zap.Int("BatchWorkflowCount", params.BatchWorkflowCount))
wid := -1
for i := batchStartID; i < totalSignalCount; i++ {
randTaskListID := rand.Intn(numTaskList)
workflowOptions.TaskList = common.GetTaskListName(randTaskListID)
if wid == -1 || rand.Float64() > 0.1 {
// 10% chance we are going to send signal to the same workflow as last time, this is to simulate multiple
// signals to same workflow within short time.
wid = params.StartingWorkflowID + int(rand.Int31n(int32(params.BatchWorkflowCount)))
/*
If we have large number of target workflow set:
We want to simulate the real scenario where signals coming to a small set of users instead of randomly to any
user in the entire user set. This set of active user is moving around the glob as day time moves. So we want to
simulate here is that, we define a 1 hour window around current time, and only select random user from this
window.
*/
if params.EnableRollingWindow {
wid = params.StartingWorkflowID + getRandomID(params.BatchWorkflowCount, time.Now())
}
}
workflowID := getStressWorkflowID(loaderID, wid)
wfIDs[workflowID] = struct{}{}
signal := signalEvent{SignalNumber: i, Data: data, Timestamp: time.Now().UnixNano()}
if err := limiter.Wait(ctx); err != nil {
if ctx.Err() != nil {
return signalActivityResult{}, ctx.Err()
}
continue
}
wfParams.ScheduleTimeNano = time.Now().UnixNano()
_, err := cc.SignalWithStartWorkflow(ctx, workflowID, stressTestSignalName, signal, workflowOptions, processSignalWorkflowName, wfParams)
if err == nil {
logger.Debug("SignalWithStartWorkflow succeed", zap.String("workflowID", workflowID))
succeedCount++
} else {
logger.Error("SignalWithStartWorkflow failed", zap.Error(err))
failedCount++
}
activity.RecordHeartbeat(ctx, i)
if ctx.Err() != nil {
return signalActivityResult{}, ctx.Err()
}
}
// send last signal to notify the target workflow to exit
exitStartID := 0
if batchStartID > totalSignalCount {
// this means previous attempt failed after sending all normal signals, and failed while sending exit signal.
exitStartID = batchStartID - totalSignalCount
}
for i := exitStartID; i < params.BatchWorkflowCount; i++ {
wid := params.StartingWorkflowID + i
workflowID := getStressWorkflowID(loaderID, wid)
signal := signalEvent{SignalNumber: -1, Data: data}
if err := limiter.Wait(ctx); err != nil {
if ctx.Err() != nil {
return signalActivityResult{}, ctx.Err()
}
continue
}
err := cc.SignalWorkflow(context.Background(), workflowID, "", stressTestSignalName, signal)
if err == nil {
logger.Debug("SignalWorkflow succeed", zap.String("workflowID", workflowID))
succeedCount++
} else {
logger.Error("SignalWorkflow failed", zap.Error(err))
failedCount++
}
activity.RecordHeartbeat(ctx, totalSignalCount+i)
if ctx.Err() != nil {
return signalActivityResult{}, ctx.Err()
}
}
return signalActivityResult{SucceedCount: succeedCount, FailureCount: failedCount}, nil
}
func verifyResultActivity(ctx context.Context, params verifyActivityParams) error {
cc := ctx.Value(lib.CtxKeyCadenceClient).(lib.CadenceClient)
info := activity.GetInfo(ctx)
// step 1. verify if any open workflow
listWorkflowRequest := &shared.ListOpenWorkflowExecutionsRequest{
Domain: c.StringPtr(info.WorkflowDomain),
MaximumPageSize: &maxPageSize,
StartTimeFilter: &shared.StartTimeFilter{
EarliestTime: c.Int64Ptr(params.WorkflowStartTime),
LatestTime: c.Int64Ptr(time.Now().UnixNano()),
},
TypeFilter: &shared.WorkflowTypeFilter{
Name: c.StringPtr(processSignalWorkflowName),
},
}
openWorkflow, err := cc.ListOpenWorkflow(ctx, listWorkflowRequest)
if err != nil {
return err
}
if len(openWorkflow.Executions) > 0 {
return cadence.NewCustomError(
common.ErrReasonValidationFailed,
"found open workflows after signal load test completed",
)
}
// step 2: check failed workflow reason
query := fmt.Sprintf(
failedSignalWorkflowQuery,
processSignalWorkflowName,
params.WorkflowStartTime,
time.Now().UnixNano())
request := &shared.CountWorkflowExecutionsRequest{
Domain: c.StringPtr(info.WorkflowDomain),
Query: &query,
}
resp, err := cc.CountWorkflow(ctx, request)
if err != nil {
return err
}
if resp.GetCount() > params.FailedWorkflowCount {
return cadence.NewCustomError(
common.ErrReasonValidationFailed,
"found failed workflows after signal load test completed",
)
}
return nil
}
func getStressWorkflowID(loaderID string, wid int) string {
return fmt.Sprintf("%v-sync-stress-%v", loaderID, wid)
}
func processSignalWorkflow(ctx workflow.Context, params lib.ProcessSignalWorkflowConfig) error {
logger := workflow.GetLogger(ctx)
info := workflow.GetInfo(ctx)
profile, err := lib.BeginWorkflow(ctx, processSignalWorkflowName, params.ScheduleTimeNano)
if err != nil {
return err
}
ch := workflow.GetSignalChannel(ctx, stressTestSignalName)
totalSigCount := 0
signalDelayCount := 0
main_loop:
for {
var signal signalEvent
ch.Receive(ctx, &signal)
totalSigCount++
if signal.SignalNumber == exitSignalNumber {
break main_loop
}
err := processSignal(ctx, signal, params)
if err != nil {
// log the error and continue
logger.Error("sync api bench test process signal failed.")
}
batchSigCount := 1
for ch.ReceiveAsync(&signal) {
totalSigCount++
batchSigCount++
if signal.SignalNumber == exitSignalNumber {
break main_loop
}
if workflow.Now(ctx).Sub(time.Unix(0, signal.Timestamp)) > time.Duration(params.MaxSignalDelayInSeconds)*time.Second {
signalDelayCount++
if signalDelayCount > params.MaxSignalDelayCount {
return fmt.Errorf(fmt.Sprintf("received %v signals are longer than %v seconds.", params.MaxSignalDelayCount, params.MaxSignalDelayInSeconds))
}
}
err := processSignal(ctx, signal, params)
if err != nil {
// log the error and continue
logger.Error("sync stress workflow process signal failed.")
}
if batchSigCount >= 5 {
logger.Info("force sleep 1s", zap.String("WorkflowID", info.WorkflowExecution.ID), zap.String("RunID", info.WorkflowExecution.RunID))
err := workflow.Sleep(ctx, time.Second)
if err != nil {
logger.Error("Failed sleep", zap.Error(err))
}
break // continue main_loop
}
}
if totalSigCount >= params.SignalBeforeContinueAsNew {
logger.Info("ContinueAsNew", zap.Int("ProcessedSignalCount", totalSigCount))
profile.End(nil)
params.ScheduleTimeNano = workflow.Now(ctx).UnixNano()
return workflow.NewContinueAsNewError(ctx, processSignalWorkflowName, params)
}
}
logger.Info("sync stress workflow completed")
profile.End(nil)
return nil
}
func processSignal(ctx workflow.Context, signal signalEvent, params lib.ProcessSignalWorkflowConfig) error {
retryPolicy := &cadence.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 3,
MaximumInterval: time.Minute,
ExpirationInterval: time.Minute * 5,
MaximumAttempts: 5,
}
lao := workflow.LocalActivityOptions{
ScheduleToCloseTimeout: time.Second * 3,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithLocalActivityOptions(ctx, lao)
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Hour,
StartToCloseTimeout: time.Minute,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var actionFutures []workflow.Future
var trueConditions []string
for i := 0; i < params.CampaignCount; i++ {
var conditionMeet bool
err := workflow.ExecuteLocalActivity(ctx, checkCondition, params.ActionRate, params.FailureRate).Get(ctx, &conditionMeet)
if err != nil {
return err
}
if conditionMeet {
f := workflow.ExecuteActivity(ctx, processSignalActivityName, signal, params.FailureRate, workflow.Now(ctx).UnixNano())
actionFutures = append(actionFutures, f)
trueConditions = append(trueConditions, strconv.Itoa(i))
}
}
for _, f := range actionFutures {
var actionResult string
err := f.Get(ctx, &actionResult)
if err != nil {
return err
}
}
workflow.GetLogger(ctx).Info("Processed signal", zap.Int("SignalNumber", signal.SignalNumber), zap.Int("ActionCount", len(trueConditions)))
return nil
}
// checkCondition is a local activity to check condition, it returns true if action needs to be taken
func checkCondition(
ctx context.Context,
actionRate float64,
failureRate float64,
) (bool, error) {
info := activity.GetInfo(ctx)
logger := activity.GetLogger(ctx)
if info.Attempt > 0 {
failureRate += 0.1 * float64(info.Attempt) // increase failure rate for retry attempt
logger.Info("Retry attempt", zap.Int32("Attempt", info.Attempt), zap.Float64("FailureRate", failureRate))
}
if rand.Float64() < failureRate {
return false, errors.New("failed by rand")
}
return rand.Float64() < actionRate, nil
}
// processSignalActivity is a regular activity simulate actual action
func processSignalActivity(
ctx context.Context,
evt signalEvent,
failureRate float64,
scheduleTimeNano int64,
) (err error) {
info := activity.GetInfo(ctx)
logger := activity.GetLogger(ctx)
svcConfig := common.GetActivityServiceConfig(ctx)
scope := svcConfig.Metrics
if scope == nil {
panic("metrics client is not set")
}
scope, stopWatch := lib.RecordActivityStart(scope, processSignalActivityName, scheduleTimeNano)
defer lib.RecordActivityEnd(scope, stopWatch, err)
if info.Attempt > 0 {
failureRate += 0.1 * float64(info.Attempt) // incrase failure rate for retry attempt
logger.Info("Retry attempt", zap.Int32("Attempt", info.Attempt), zap.Float64("FailureRate", failureRate))
}
if rand.Float64() < failureRate {
return errors.New("failed by rand")
}
return nil
}
var secondsInADay = 24 * 60 * 60
func getRandomID(totalCount int, now time.Time) int {
r := rand.Float64()
windowSize := totalCount / 24 // window size
seconds := now.Second() + now.Minute()*60 + now.Hour()*3600
beginOfWindowID := int64(seconds-1800) * int64(totalCount) / int64(secondsInADay)
wid := int(beginOfWindowID + int64(r*float64(windowSize)))
wid = (wid + totalCount) % totalCount
return wid
}