tools/cli/workflow.go (539 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cli import ( "strings" "time" "github.com/urfave/cli" "github.com/uber/cadence/service/worker/batcher" ) func newWorkflowCommands() []cli.Command { return []cli.Command{ { Name: "restart", Aliases: []string{"res"}, Usage: "restarts a previous workflow execution", Flags: flagsForExecution, Action: func(c *cli.Context) { RestartWorkflow(c) }, }, { Name: "activity", Aliases: []string{"act"}, Usage: "operate activities of workflow", Subcommands: newActivityCommands(), }, { Name: "show", Usage: "show workflow history", Flags: getFlagsForShow(), Action: func(c *cli.Context) { ShowHistory(c) }, }, { Name: "showid", Usage: "show workflow history with given workflow_id and run_id (a shortcut of `show -w <wid> -r <rid>`). run_id is only required for archived history", Description: "cadence workflow showid <workflow_id> <run_id>. workflow_id is required; run_id is only required for archived history", Flags: getFlagsForShowID(), Action: func(c *cli.Context) { ShowHistoryWithWID(c) }, }, { Name: "start", Usage: "start a new workflow execution", Flags: getFlagsForStart(), Action: func(c *cli.Context) { StartWorkflow(c) }, }, { Name: "run", Usage: "start a new workflow execution and get workflow progress", Flags: getFlagsForRun(), Action: func(c *cli.Context) { RunWorkflow(c) }, }, { Name: "cancel", Aliases: []string{"c"}, Usage: "cancel a workflow execution", Flags: getFlagsForCancel(), Action: func(c *cli.Context) { CancelWorkflow(c) }, }, { Name: "signal", Aliases: []string{"s"}, Usage: "signal a workflow execution", Flags: getFlagsForSignal(), Action: func(c *cli.Context) { SignalWorkflow(c) }, }, { Name: "signalwithstart", Usage: "signal the current open workflow if exists, or attempt to start a new run based on IDResuePolicy and signals it", Flags: getFlagsForSignalWithStart(), Action: func(c *cli.Context) { SignalWithStartWorkflowExecution(c) }, }, { Name: "terminate", Aliases: []string{"term"}, Usage: "terminate a new workflow execution", Flags: getFlagsForTerminate(), Action: func(c *cli.Context) { TerminateWorkflow(c) }, }, { Name: "list", Aliases: []string{"l"}, Usage: "list open or closed workflow executions", Description: "list one page (default size 10 items) by default, use flag --pagesize to change page size", Flags: getFlagsForList(), Action: func(c *cli.Context) { ListWorkflow(c) }, }, { Name: "listall", Aliases: []string{"la"}, Usage: "list all open or closed workflow executions", Flags: getFlagsForListAll(), Action: func(c *cli.Context) { ListAllWorkflow(c) }, }, { Name: "listarchived", Usage: "list archived workflow executions", Flags: getFlagsForListArchived(), Action: func(c *cli.Context) { ListArchivedWorkflow(c) }, }, { Name: "scan", Aliases: []string{"sc", "scanall"}, Usage: "scan workflow executions (need to enable Cadence server on ElasticSearch). " + "It will be faster than listall, but result are not sorted.", Flags: getFlagsForScan(), Action: func(c *cli.Context) { ScanAllWorkflow(c) }, }, { Name: "count", Aliases: []string{"cnt"}, Usage: "count number of workflow executions (need to enable Cadence server on ElasticSearch)", Flags: getFlagsForCount(), Action: func(c *cli.Context) { CountWorkflow(c) }, }, { Name: "query", Usage: "query workflow execution", Description: "query result will be printed as JSON", Flags: getFlagsForQuery(), Action: func(c *cli.Context) { QueryWorkflow(c) }, }, { Name: "query-types", Usage: "list all available query types", Flags: getFlagsForStack(), Action: func(c *cli.Context) { QueryWorkflowUsingQueryTypes(c) }, }, { Name: "stack", Usage: "query workflow execution with __stack_trace as query type", Flags: getFlagsForStack(), Action: func(c *cli.Context) { QueryWorkflowUsingStackTrace(c) }, }, { Name: "describe", Aliases: []string{"desc"}, Usage: "show information of workflow execution", Flags: getFlagsForDescribe(), Action: func(c *cli.Context) { DescribeWorkflow(c) }, }, { Name: "describeid", Aliases: []string{"descid"}, Usage: "show information of workflow execution with given workflow_id and optional run_id (a shortcut of `describe -w <wid> -r <rid>`)", Description: "cadence workflow describeid <workflow_id> <run_id>. workflow_id is required; run_id is optional", Flags: getFlagsForDescribeID(), Action: func(c *cli.Context) { DescribeWorkflowWithID(c) }, }, { Name: "observe", Aliases: []string{"ob"}, Usage: "show the progress of workflow history", Flags: getFlagsForObserve(), Action: func(c *cli.Context) { ObserveHistory(c) }, }, { Name: "observeid", Aliases: []string{"obid"}, Usage: "show the progress of workflow history with given workflow_id and optional run_id (a shortcut of `observe -w <wid> -r <rid>`)", Flags: getFlagsForObserveID(), Action: func(c *cli.Context) { ObserveHistoryWithID(c) }, }, { Name: "reset", Aliases: []string{"rs"}, Usage: "reset the workflow, by either eventID or resetType.", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagWorkflowIDWithAlias, Usage: "WorkflowID, required", }, cli.StringFlag{ Name: FlagRunIDWithAlias, Usage: "RunID, optional, default to the current/latest RunID", }, cli.StringFlag{ Name: FlagEventID, Usage: "The eventID of any event after DecisionTaskStarted you want to reset to (this event is exclusive in a new run. The new run " + "history will fork and continue from the previous eventID of this). It can be DecisionTaskCompleted, DecisionTaskFailed or others", }, cli.StringFlag{ Name: FlagReason, Usage: "reason to do the reset, required for tracking purpose", }, cli.StringFlag{ Name: FlagResetType, Usage: "where to reset. Support one of these: " + strings.Join(mapKeysToArray(resetTypesMap), ","), }, cli.StringFlag{ Name: FlagDecisionOffset, Usage: "based on the reset point calculated by resetType, this offset will move/offset the point by decision. Currently only negative number is supported, and only works with LastDecisionCompleted.", }, cli.StringFlag{ Name: FlagResetBadBinaryChecksum, Usage: "Binary checksum for resetType of BadBinary", }, cli.StringFlag{ Name: FlagEarliestTimeWithAlias, Usage: "EarliestTime of decision start time, required for resetType of DecisionCompletedTime." + "Supported formats are '2006-01-02T15:04:05+07:00', raw UnixNano and " + "time range (N<duration>), where 0 < N < 1000000 and duration (full-notation/short-notation) can be second/s, " + "minute/m, hour/h, day/d, week/w, month/M or year/y. For example, '15minute' or '15m' implies last 15 minutes, " + "meaning that workflow will be reset to the first decision that completed in last 15 minutes.", }, cli.BoolFlag{ Name: FlagSkipSignalReapply, Usage: "whether or not skipping signals reapply after the reset point", }, }, Action: func(c *cli.Context) { ResetWorkflow(c) }, }, { Name: "reset-batch", Usage: "reset workflow in batch by resetType: " + strings.Join(mapKeysToArray(resetTypesMap), ",") + "To get base workflowIDs/runIDs to reset, source is from input file or visibility query.", ArgsUsage: "\n\t To reset workflows specify --input_file <csv_file> of workflow_id and run_id and run: cadence wf reset-batch --input_file <csv_file>", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagInputFileWithAlias, Usage: "Input file to use for resetting, one workflow per line of WorkflowID and RunID. RunID is optional, default to current runID if not specified. ", }, cli.StringFlag{ Name: FlagListQueryWithAlias, Usage: "visibility query to get workflows to reset", }, cli.StringFlag{ Name: FlagExcludeFile, Value: "", Usage: "Another input file to use for excluding from resetting, only workflowID is needed.", }, cli.StringFlag{ Name: FlagExcludeWorkflowIDByQuery, Usage: "Another visibility SQL like query, but for excluding the results by workflowIDs. This is useful because a single query cannot do join operation. One use case is to " + "find failed workflows excluding any workflow that has another run that is open or completed.", }, cli.StringFlag{ Name: FlagInputSeparator, Value: "\t", Usage: "Separator for input file(default to tab)", }, cli.StringFlag{ Name: FlagReason, Usage: "Reason for reset, required for tracking purpose", }, cli.IntFlag{ Name: FlagParallismDeprecated, Value: 1, Usage: "Number of goroutines to run in parallel. Each goroutine would process one line for every second.", Hidden: true, }, cli.IntFlag{ Name: FlagParallelism, Value: 1, Usage: "Number of goroutines to run in parallel. Each goroutine would process one line for every second.", }, cli.BoolFlag{ Name: FlagSkipCurrentOpen, Usage: "Skip the workflow if the current run is open for the same workflowID as base.", }, cli.BoolFlag{ Name: FlagSkipCurrentCompleted, Usage: "Skip the workflow if the current run is completed for the same workflowID as base.", }, cli.BoolFlag{ Name: FlagSkipBaseIsNotCurrent, // TODO https://github.com/uber/cadence/issues/2930 // The right way to prevent needs server side implementation . // This client side is only best effort Usage: "Skip if base run is not current run.", }, cli.BoolFlag{ Name: FlagNonDeterministicOnly, Usage: "Only apply onto workflows whose last event is decisionTaskFailed with non deterministic error.", }, cli.BoolFlag{ Name: FlagDryRun, Usage: "Not do real action of reset(just logging in STDOUT)", }, cli.StringFlag{ Name: FlagResetType, Usage: "where to reset. Support one of these: " + strings.Join(mapKeysToArray(resetTypesMap), ","), }, cli.StringFlag{ Name: FlagDecisionOffset, Usage: "based on the reset point calculated by resetType, this offset will move/offset the point by decision. " + "Limitation: currently only negative number is supported, and only works with LastDecisionCompleted.", }, cli.StringFlag{ Name: FlagResetBadBinaryChecksum, Usage: "Binary checksum for resetType of BadBinary", }, cli.BoolFlag{ Name: FlagSkipSignalReapply, Usage: "whether or not skipping signals reapply after the reset point", }, cli.StringFlag{ Name: FlagEarliestTimeWithAlias, Usage: "EarliestTime of decision start time, required for resetType of DecisionCompletedTime." + "Supported formats are '2006-01-02T15:04:05+07:00', raw UnixNano and " + "time range (N<duration>), where 0 < N < 1000000 and duration (full-notation/short-notation) can be second/s, " + "minute/m, hour/h, day/d, week/w, month/M or year/y. For example, '15minute' or '15m' implies last 15 minutes, " + "meaning that workflow will be reset to the first decision that completed in last 15 minutes.", }, }, Action: func(c *cli.Context) { ResetInBatch(c) }, }, { Name: "batch", Usage: "batch operation on a list of workflows from query.", Subcommands: newBatchCommands(), ArgsUsage: "\n\t To make a batch operation use wf batch start command and specify --batch_type to terminate/signal/cancel workflows.\n" + "\t ex: to batch terminate workflows run: cadence batch start --batch_type terminate --query <targeted_workflows_query>\n" + "\t cadence wf batch terminate - is used to terminate a batch operation not workflows.\n" + "\t To inspect the progress run: cadence wf batch desc --job_id <your_job_id>", }, } } func newActivityCommands() []cli.Command { return []cli.Command{ { Name: "complete", Aliases: []string{"comp"}, Usage: "complete an activity", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagWorkflowIDWithAlias, Usage: "WorkflowID", }, cli.StringFlag{ Name: FlagRunIDWithAlias, Usage: "RunID", }, cli.StringFlag{ Name: FlagActivityIDWithAlias, Usage: "The activityID to operate on", }, cli.StringFlag{ Name: FlagResult, Usage: "Result of the activity", }, cli.StringFlag{ Name: FlagIdentity, Usage: "Identity of the operator", }, }, Action: func(c *cli.Context) { CompleteActivity(c) }, }, { Name: "fail", Usage: "fail an activity", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagWorkflowIDWithAlias, Usage: "WorkflowID", }, cli.StringFlag{ Name: FlagRunIDWithAlias, Usage: "RunID", }, cli.StringFlag{ Name: FlagActivityIDWithAlias, Usage: "The activityID to operate on", }, cli.StringFlag{ Name: FlagReason, Usage: "Reason to fail the activity", }, cli.StringFlag{ Name: FlagDetail, Usage: "Detail to fail the activity", }, cli.StringFlag{ Name: FlagIdentity, Usage: "Identity of the operator", }, }, Action: func(c *cli.Context) { FailActivity(c) }, }, } } func newBatchCommands() []cli.Command { return []cli.Command{ { Name: "describe", Aliases: []string{"desc"}, Usage: "Describe a batch operation job", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagJobIDWithAlias, Usage: "Batch Job ID", }, }, Action: func(c *cli.Context) { DescribeBatchJob(c) }, }, { Name: "terminate", Usage: "terminate a batch operation job", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagJobIDWithAlias, Usage: "Batch Job ID", }, cli.StringFlag{ Name: FlagReasonWithAlias, Usage: "Reason to stop this batch job", }, }, Action: func(c *cli.Context) { TerminateBatchJob(c) }, }, { Name: "list", Aliases: []string{"l"}, Usage: "Describe a batch operation job", Flags: []cli.Flag{ cli.IntFlag{ Name: FlagPageSizeWithAlias, Value: 30, Usage: "Result page size", }, }, Action: func(c *cli.Context) { ListBatchJobs(c) }, }, { Name: "start", Usage: "Start a batch operation job", Flags: []cli.Flag{ cli.StringFlag{ Name: FlagListQueryWithAlias, Usage: "Query to get workflows for being executed this batch operation", }, cli.StringFlag{ Name: FlagReasonWithAlias, Usage: "Reason to run this batch job", }, cli.StringFlag{ Name: FlagBatchTypeWithAlias, Usage: "Types supported: " + strings.Join(batcher.AllBatchTypes, ","), }, // below are optional cli.StringFlag{ Name: FlagSignalNameWithAlias, Usage: "Required for batch signal", }, cli.StringFlag{ Name: FlagInputWithAlias, Usage: "Optional input of signal", }, cli.StringFlag{ Name: FlagSourceClusterWithAlias, Usage: "Required for batch replicate", }, cli.StringFlag{ Name: FlagTargetClusterWithAlias, Usage: "Required for batch replicate", }, cli.IntFlag{ Name: FlagRPS, Value: batcher.DefaultRPS, Usage: "RPS of processing", }, cli.BoolFlag{ Name: FlagYes, Usage: "Optional flag to disable confirmation prompt", }, cli.IntFlag{ Name: FlagPageSize, Value: batcher.DefaultPageSize, Usage: "PageSize of processiing", }, cli.IntFlag{ Name: FlagRetryAttempts, Value: batcher.DefaultAttemptsOnRetryableError, Usage: "Retry attempts for retriable errors", }, cli.IntFlag{ Name: FlagActivityHeartBeatTimeoutWithAlias, Value: int(batcher.DefaultActivityHeartBeatTimeout / time.Second), Usage: "Heartbeat timeout for batcher activity in seconds", }, cli.IntFlag{ Name: FlagConcurrency, Value: batcher.DefaultConcurrency, Usage: "Concurrency of batch activity", }, }, Action: func(c *cli.Context) { StartBatchJob(c) }, }, } }