tools/cli/admin.go (1,379 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cli
import (
"fmt"
"strings"
"time"
"github.com/urfave/cli"
"github.com/uber/cadence/common/reconciliation/invariant"
"github.com/uber/cadence/service/worker/scanner/executions"
)
func newAdminWorkflowCommands() []cli.Command {
return []cli.Command{
{
Name: "show",
Aliases: []string{"show"},
Usage: "show workflow history from database",
Flags: append(getDBFlags(),
// v2 history events
cli.StringFlag{
Name: FlagTreeID,
Usage: "TreeID",
},
cli.StringFlag{
Name: FlagBranchID,
Usage: "BranchID",
},
cli.Int64Flag{
Name: FlagMinEventID,
Value: 1,
Usage: "MinEventID",
},
cli.Int64Flag{
Name: FlagMaxEventID,
Value: 10000,
Usage: "MaxEventID",
},
cli.StringFlag{
Name: FlagOutputFilenameWithAlias,
Usage: "output file",
},
// support mysql query
cli.IntFlag{
Name: FlagShardIDWithAlias,
Usage: "ShardID",
}),
Action: func(c *cli.Context) {
AdminShowWorkflow(c)
},
},
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "Describe internal information of workflow execution",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
},
Action: func(c *cli.Context) {
AdminDescribeWorkflow(c)
},
},
{
Name: "refresh-tasks",
Aliases: []string{"rt"},
Usage: "Refreshes all the tasks of a workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
},
Action: func(c *cli.Context) {
AdminRefreshWorkflowTasks(c)
},
},
{
Name: "delete",
Aliases: []string{"del"},
Usage: "Delete current workflow execution and the mutableState record",
Flags: append(getDBFlags(),
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
cli.BoolFlag{
Name: FlagSkipErrorModeWithAlias,
Usage: "skip errors when deleting history",
},
cli.BoolFlag{
Name: FlagRemote,
Usage: "Executes deletion on server side",
}),
Action: func(c *cli.Context) {
AdminDeleteWorkflow(c)
},
},
{
Name: "fix_corruption",
Aliases: []string{"fc"},
Usage: "Checks if workflow record is corrupted in database and cleans up",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
cli.BoolFlag{
Name: FlagSkipErrorModeWithAlias,
Usage: "Skip errors and tries to delete as much as possible from the DB",
},
},
Action: func(c *cli.Context) {
AdminMaintainCorruptWorkflow(c)
},
},
}
}
func newAdminShardManagementCommands() []cli.Command {
return []cli.Command{
{
Name: "describe",
Aliases: []string{"d"},
Usage: "Describe shard by Id",
Flags: append(
getDBFlags(),
cli.IntFlag{
Name: FlagShardID,
Usage: "The Id of the shard to describe",
},
),
Action: func(c *cli.Context) {
AdminDescribeShard(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "List shard distribution",
Flags: []cli.Flag{
cli.IntFlag{
Name: FlagPageSize,
Value: 100,
Usage: "Max number of results to return",
},
cli.IntFlag{
Name: FlagPageID,
Value: 0,
Usage: "Option to show results offset from pagesize * page_id",
},
getFormatFlag(),
},
Action: func(c *cli.Context) {
AdminDescribeShardDistribution(c)
},
},
{
Name: "setRangeID",
Aliases: []string{"srid"},
Usage: "Force update shard rangeID",
Flags: append(
getDBFlags(),
cli.IntFlag{
Name: FlagShardIDWithAlias,
Usage: "ID of the shard to reset",
},
cli.Int64Flag{
Name: FlagRangeIDWithAlias,
Usage: "new shard rangeID",
},
),
Action: func(c *cli.Context) {
AdminSetShardRangeID(c)
},
},
{
Name: "closeShard",
Aliases: []string{"clsh"},
Usage: "close a shard given a shard id",
Flags: []cli.Flag{
cli.IntFlag{
Name: FlagShardID,
Usage: "ShardID for the cadence cluster to manage",
},
},
Action: func(c *cli.Context) {
AdminCloseShard(c)
},
},
{
Name: "removeTask",
Aliases: []string{"rmtk"},
Usage: "remove a task based on shardID, task type, taskID, and task visibility timestamp",
Flags: []cli.Flag{
cli.IntFlag{
Name: FlagShardID,
Usage: "shardID",
},
cli.Int64Flag{
Name: FlagTaskID,
Usage: "taskID",
},
cli.IntFlag{
Name: FlagTaskType,
Usage: "task type: 2 (transfer task), 3 (timer task), 4 (replication task) or 6 (cross-cluster task)",
},
cli.Int64Flag{
Name: FlagTaskVisibilityTimestamp,
Usage: "task visibility timestamp in nano (required for removing timer task)",
},
cli.StringFlag{
Name: FlagCluster,
Usage: "target cluster of the task (required for removing cross-cluster task)",
},
},
Action: func(c *cli.Context) {
AdminRemoveTask(c)
},
},
{
Name: "timers",
Usage: "get scheduled timers for a given time range",
Flags: append(getDBFlags(),
cli.IntFlag{
Name: FlagShardID,
Usage: "shardID",
},
cli.IntFlag{
Name: FlagPageSize,
Usage: "page size used to query db executions table",
Value: 500,
},
cli.StringFlag{
Name: FlagStartDate,
Usage: "start date",
Value: time.Now().UTC().Format(time.RFC3339),
},
cli.StringFlag{
Name: FlagEndDate,
Usage: "end date",
Value: time.Now().UTC().Add(24 * time.Hour).Format(time.RFC3339),
},
cli.StringFlag{
Name: FlagDomainID,
Usage: "filter tasks by DomainID",
},
cli.IntSliceFlag{
Name: FlagTimerType,
Usage: "timer types: 0 - DecisionTimeoutTask, 1 - TaskTypeActivityTimeout, " +
"2 - TaskTypeUserTimer, 3 - TaskTypeWorkflowTimeout, 4 - TaskTypeDeleteHistoryEvent, " +
"5 - TaskTypeActivityRetryTimer, 6 - TaskTypeWorkflowBackoffTimer",
Value: &cli.IntSlice{-1},
},
cli.BoolFlag{
Name: FlagPrintJSON,
Usage: "print raw json data instead of histogram",
},
cli.BoolFlag{
Name: FlagSkipErrorMode,
Usage: "skip errors",
},
cli.StringFlag{
Name: FlagInputFile,
Usage: "file to use, will not connect to persistence",
},
cli.StringFlag{
Name: FlagDateFormat,
Usage: "create buckets using time format. Use Go reference time: Mon Jan 2 15:04:05 MST 2006. If set, --" + FlagBucketSize + " is ignored",
},
cli.StringFlag{
Name: FlagBucketSize,
Value: "hour",
Usage: "group timers by time bucket. Available: day, hour, minute, second",
},
cli.IntFlag{
Name: FlagShardMultiplier,
Usage: "multiply timer counters for histogram",
Value: 16384,
},
),
Action: func(c *cli.Context) {
AdminTimers(c)
},
},
}
}
func newAdminHistoryHostCommands() []cli.Command {
return []cli.Command{
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "Describe internal information of history host",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagHistoryAddressWithAlias,
Usage: "History Host address(IP:PORT)",
},
cli.IntFlag{
Name: FlagShardIDWithAlias,
Usage: "ShardID",
},
cli.BoolFlag{
Name: FlagPrintFullyDetailWithAlias,
Usage: "Print fully detail",
},
},
Action: func(c *cli.Context) {
AdminDescribeHistoryHost(c)
},
},
{
Name: "getshard",
Aliases: []string{"gsh"},
Usage: "Get shardID for a workflowID",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.IntFlag{
Name: FlagNumberOfShards,
Usage: "NumberOfShards for the cadence cluster(see config for numHistoryShards)",
},
},
Action: func(c *cli.Context) {
AdminGetShardID(c)
},
},
}
}
func newAdminDomainCommands() []cli.Command {
return []cli.Command{
{
Name: "register",
Aliases: []string{"re"},
Usage: "Register workflow domain",
Flags: adminRegisterDomainFlags,
Action: func(c *cli.Context) {
newDomainCLI(c, true).RegisterDomain(c)
},
},
{
Name: "update",
Aliases: []string{"up", "u"},
Usage: "Update existing workflow domain",
Flags: adminUpdateDomainFlags,
Action: func(c *cli.Context) {
newDomainCLI(c, true).UpdateDomain(c)
},
},
{
Name: "deprecate",
Aliases: []string{"dep"},
Usage: "Deprecate existing workflow domain",
Flags: adminDeprecateDomainFlags,
Action: func(c *cli.Context) {
newDomainCLI(c, true).DeprecateDomain(c)
},
},
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "Describe existing workflow domain",
Flags: adminDescribeDomainFlags,
Action: func(c *cli.Context) {
newDomainCLI(c, true).DescribeDomain(c)
},
},
{
Name: "getdomainidorname",
Aliases: []string{"getdn"},
Usage: "Get domainID or domainName",
Flags: append(getDBFlags(),
cli.StringFlag{
Name: FlagDomain,
Usage: "DomainName",
},
cli.StringFlag{
Name: FlagDomainID,
Usage: "Domain ID(uuid)",
}),
Action: func(c *cli.Context) {
AdminGetDomainIDOrName(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "List all domains in the cluster",
Flags: []cli.Flag{
cli.IntFlag{
Name: FlagPageSizeWithAlias,
Value: 10,
Usage: "Result page size",
},
cli.BoolFlag{
Name: FlagAllWithAlias,
Usage: "List all domains, by default only domains in REGISTERED status are listed",
},
cli.BoolFlag{
Name: FlagDeprecatedWithAlias,
Usage: "List deprecated domains only, by default only domains in REGISTERED status are listed",
},
cli.StringFlag{
Name: FlagPrefix,
Usage: "List domains that are matching to the given prefix",
Value: "",
},
cli.BoolFlag{
Name: FlagPrintFullyDetailWithAlias,
Usage: "Print full domain detail",
},
cli.BoolFlag{
Name: FlagPrintJSONWithAlias,
Usage: "Print in raw json format (DEPRECATED: instead use --format json)",
},
getFormatFlag(),
},
Action: func(c *cli.Context) {
newDomainCLI(c, false).ListDomains(c)
},
},
}
}
func newAdminKafkaCommands() []cli.Command {
return []cli.Command{
{
// TODO: do we still need this command given that kafka replication has been deprecated?
Name: "parse",
Aliases: []string{"par"},
Usage: "Parse replication tasks from kafka messages",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagInputFileWithAlias,
Usage: "Input file to use, if not present assumes piping",
},
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID, if not provided then no filters by WorkflowID are applied",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID, if not provided then no filters by RunID are applied",
},
cli.StringFlag{
Name: FlagOutputFilenameWithAlias,
Usage: "Output file to write to, if not provided output is written to stdout",
},
cli.BoolFlag{
Name: FlagSkipErrorModeWithAlias,
Usage: "Skip errors in parsing messages",
},
cli.BoolFlag{
Name: FlagHeadersModeWithAlias,
Usage: "Output headers of messages in format: DomainID, WorkflowID, RunID, FirstEventID, NextEventID",
},
cli.IntFlag{
Name: FlagMessageTypeWithAlias,
Usage: "Kafka message type (0: replicationTasks; 1: visibility)",
Value: 0,
},
},
Action: func(c *cli.Context) {
AdminKafkaParse(c)
},
},
{
// TODO: move this command be a subcommand of admin workflow
Name: "rereplicate",
Aliases: []string{"rrp"},
Usage: "Rereplicate replication tasks from history tables",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "Name of source cluster to resend the replication task",
},
cli.StringFlag{
Name: FlagDomainID,
Usage: "DomainID",
},
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "WorkflowID",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
cli.Int64Flag{
Name: FlagMaxEventID,
Usage: "MaxEventID Optional, default to all events",
},
cli.StringFlag{
Name: FlagEndEventVersion,
Usage: "Workflow end event version, required if MaxEventID is specified",
}},
Action: func(c *cli.Context) {
AdminRereplicate(c)
},
},
}
}
func newAdminElasticSearchCommands() []cli.Command {
return []cli.Command{
{
Name: "catIndex",
Aliases: []string{"cind"},
Usage: "Cat Indices on ElasticSearch",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
getFormatFlag(),
},
Action: func(c *cli.Context) {
AdminCatIndices(c)
},
},
{
Name: "index",
Aliases: []string{"ind"},
Usage: "Index docs on ElasticSearch",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagIndex,
Usage: "ElasticSearch target index",
},
cli.StringFlag{
Name: FlagInputFileWithAlias,
Usage: "Input file of indexer.Message in json format, separated by newline",
},
cli.IntFlag{
Name: FlagBatchSizeWithAlias,
Usage: "Optional batch size of actions for bulk operations",
Value: 1000,
},
},
Action: func(c *cli.Context) {
AdminIndex(c)
},
},
{
Name: "delete",
Aliases: []string{"del"},
Usage: "Delete docs on ElasticSearch",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagIndex,
Usage: "ElasticSearch target index",
},
cli.StringFlag{
Name: FlagInputFileWithAlias,
Usage: "Input file name. Redirect cadence wf list result (with tale format) to a file and use as delete input. " +
"First line should be table header like WORKFLOW TYPE | WORKFLOW ID | RUN ID | ...",
},
cli.IntFlag{
Name: FlagBatchSizeWithAlias,
Usage: "Optional batch size of actions for bulk operations",
Value: 1000,
},
cli.IntFlag{
Name: FlagRPS,
Usage: "Optional batch request rate per second",
Value: 30,
},
},
Action: func(c *cli.Context) {
AdminDelete(c)
},
},
{
Name: "report",
Aliases: []string{"rep"},
Usage: "Generate Report by Aggregation functions on ElasticSearch",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagURL,
Usage: "URL of ElasticSearch cluster",
},
cli.StringFlag{
Name: FlagIndex,
Usage: "ElasticSearch target index",
},
cli.StringFlag{
Name: FlagListQuery,
Usage: "SQL query of the report",
},
cli.StringFlag{
Name: FlagOutputFormat,
Usage: "Additional output format (html or csv)",
},
cli.StringFlag{
Name: FlagOutputFilename,
Usage: "Additional output filename with path",
},
},
Action: func(c *cli.Context) {
GenerateReport(c)
},
},
}
}
func newAdminTaskListCommands() []cli.Command {
return []cli.Command{
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "Describe pollers and status information of tasklist",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagTaskListWithAlias,
Usage: "TaskList description",
},
cli.StringFlag{
Name: FlagTaskListTypeWithAlias,
Value: "decision",
Usage: "Optional TaskList type [decision|activity]",
},
},
Action: func(c *cli.Context) {
AdminDescribeTaskList(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "List active tasklist under a domain",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDomainWithAlias,
Usage: "Required Domain name",
},
},
Action: func(c *cli.Context) {
AdminListTaskList(c)
},
},
}
}
func newAdminClusterCommands() []cli.Command {
return []cli.Command{
{
Name: "add-search-attr",
Aliases: []string{"asa"},
Usage: "whitelist search attribute",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagSearchAttributesKey,
Usage: "Search Attribute key to be whitelisted",
},
cli.IntFlag{
Name: FlagSearchAttributesType,
Value: -1,
Usage: "Search Attribute value type. [0:String, 1:Keyword, 2:Int, 3:Double, 4:Bool, 5:Datetime]",
},
cli.StringFlag{
Name: FlagSecurityTokenWithAlias,
Usage: "Optional token for security check",
},
},
Action: func(c *cli.Context) {
AdminAddSearchAttribute(c)
},
},
{
Name: "describe",
Aliases: []string{"d"},
Usage: "Describe cluster information",
Action: func(c *cli.Context) {
AdminDescribeCluster(c)
},
},
{
Name: "failover",
Aliases: []string{"fo"},
Usage: "Failover domains with domain data IsManagedByCadence=true to target cluster",
Subcommands: newAdminFailoverCommands(),
},
{
Name: "failover_fast",
Aliases: []string{"fof"},
Usage: "Failover domains with domain data IsManagedByCadence=true to target cluster",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagTargetClusterWithAlias,
Usage: "Target active cluster name",
},
},
Action: func(c *cli.Context) {
newDomainCLI(c, false).FailoverDomains(c)
},
},
{
Name: "rebalance",
Aliases: []string{"rb"},
Usage: "Rebalance the domains active cluster",
Subcommands: newAdminRebalanceCommands(),
},
}
}
func getDLQFlags() []cli.Flag {
return []cli.Flag{
cli.StringFlag{
Name: FlagShards,
Usage: "Comma separated shard IDs or inclusive ranges. Example: \"2,5-6,10\". Alternatively, feed one shard ID per line via STDIN.",
},
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
Value: "history",
},
cli.StringFlag{
Name: FlagSourceCluster,
Usage: "The cluster where the task is generated",
},
cli.IntFlag{
Name: FlagLastMessageIDWithAlias,
Usage: "The upper boundary of the read message",
},
}
}
func newAdminDLQCommands() []cli.Command {
return []cli.Command{
{
Name: "count",
Aliases: []string{"c"},
Usage: "Count DLQ Messages",
Flags: []cli.Flag{
getFormatFlag(),
cli.StringFlag{
Name: FlagDLQTypeWithAlias,
Usage: "Type of DLQ to manage. (Options: domain, history)",
Value: "history",
},
cli.BoolFlag{
Name: FlagForce,
Usage: "Force fetch latest counts (will put additional stress on DB)",
},
},
Action: func(c *cli.Context) {
AdminCountDLQMessages(c)
},
},
{
Name: "read",
Aliases: []string{"r"},
Usage: "Read DLQ Messages",
Flags: append(getDLQFlags(),
cli.IntFlag{
Name: FlagMaxMessageCountWithAlias,
Usage: "Max message size to fetch",
},
getFormatFlag(),
),
Action: func(c *cli.Context) {
AdminGetDLQMessages(c)
},
},
{
Name: "purge",
Aliases: []string{"p"},
Usage: "Delete DLQ messages with equal or smaller ids than the provided task id",
Flags: getDLQFlags(),
Action: func(c *cli.Context) {
AdminPurgeDLQMessages(c)
},
},
{
Name: "merge",
Aliases: []string{"m"},
Usage: "Merge DLQ messages with equal or smaller ids than the provided task id",
Flags: getDLQFlags(),
Action: func(c *cli.Context) {
AdminMergeDLQMessages(c)
},
},
}
}
func newAdminQueueCommands() []cli.Command {
return []cli.Command{
{
Name: "reset",
Usage: "reset processing queue states for transfer or timer queue processor",
Flags: getQueueCommandFlags(),
Action: func(c *cli.Context) {
AdminResetQueue(c)
},
},
{
Name: "describe",
Aliases: []string{"desc"},
Usage: "describe processing queue states for transfer or timer queue processor",
Flags: getQueueCommandFlags(),
Action: func(c *cli.Context) {
AdminDescribeQueue(c)
},
},
}
}
func newAdminAsyncQueueCommands() []cli.Command {
return []cli.Command{
{
Name: "get",
Usage: "get async workflow queue configuration of a domain",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDomain,
Usage: `domain name`,
Required: true,
},
},
Action: func(c *cli.Context) {
AdminGetAsyncWFConfig(c)
},
},
{
Name: "update",
Usage: "upsert async workflow queue configuration of a domain",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDomain,
Usage: `domain name`,
Required: true,
},
cli.StringFlag{
Name: FlagJSON,
Usage: `AsyncWorkflowConfiguration in json format. Schema can be found in https://github.com/uber/cadence/blob/master/common/types/admin.go`,
Required: true,
},
},
Action: func(c *cli.Context) {
AdminUpdateAsyncWFConfig(c)
},
},
}
}
func newDBCommands() []cli.Command {
var collections cli.StringSlice = invariant.CollectionStrings()
scanFlag := cli.StringFlag{
Name: FlagScanType,
Usage: "Scan type to use: " + strings.Join(executions.ScanTypeStrings(), ", "),
Required: true,
}
collectionsFlag := cli.StringSliceFlag{
Name: FlagInvariantCollection,
Usage: "Scan collection type to use: " + strings.Join(collections, ", "),
Value: &collections,
}
verboseFlag := cli.BoolFlag{
Name: FlagVerbose,
Usage: "verbose output",
Required: false,
}
return []cli.Command{
{
Name: "scan",
Usage: "scan executions in database and detect corruptions",
Flags: append(getDBFlags(),
cli.IntFlag{
Name: FlagNumberOfShards,
Usage: "NumberOfShards for the cadence cluster (see config for numHistoryShards)",
Required: true,
},
scanFlag,
collectionsFlag,
cli.StringFlag{
Name: FlagInputFileWithAlias,
Usage: "Input file of executions to scan in JSON format {\"DomainID\":\"x\",\"WorkflowID\":\"x\",\"RunID\":\"x\"} separated by a newline",
},
verboseFlag,
),
Action: func(c *cli.Context) {
AdminDBScan(c)
},
},
{
Name: "unsupported-workflow",
Usage: "use this command when upgrade the Cadence server from version less than 0.16.0. This scan database and detect unsupported workflow type.",
Flags: append(getDBFlags(),
cli.IntFlag{
Name: FlagRPS,
Usage: "NumberOfShards for the cadence cluster (see config for numHistoryShards)",
Value: 1000,
},
cli.StringFlag{
Name: FlagOutputFilenameWithAlias,
Usage: "Output file to write to, if not provided output is written to stdout",
},
cli.IntFlag{
Name: FlagLowerShardBound,
Usage: "FlagLowerShardBound for the start shard to scan. (Default: 0)",
Value: 0,
Required: true,
},
cli.IntFlag{
Name: FlagUpperShardBound,
Usage: "FlagLowerShardBound for the end shard to scan. (Default: 16383)",
Value: 16383,
Required: true,
},
),
Action: func(c *cli.Context) {
AdminDBScanUnsupportedWorkflow(c)
},
},
{
Name: "clean",
Usage: "clean up corrupted workflows",
Flags: append(getDBFlags(),
scanFlag,
collectionsFlag,
cli.StringFlag{
Name: FlagInputFileWithAlias,
Usage: "Input file of execution to clean in JSON format. Use `scan` command to generate list of executions.",
},
verboseFlag,
),
Action: func(c *cli.Context) {
AdminDBClean(c)
},
},
{
Name: "decode_thrift",
Usage: "decode thrift object, print into JSON if the data is matching with any supported struct",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagInputWithAlias,
EnvVar: "Input",
Usage: "Input of Thrift encoded data structure.",
},
cli.StringFlag{
Name: FlagInputEncodingWithAlias,
Usage: "Encoding of the input: [hex|base64] (Default: hex)",
},
},
Action: func(c *cli.Context) {
AdminDBDataDecodeThrift(c)
},
},
}
}
func getQueueCommandFlags() []cli.Flag {
return []cli.Flag{
cli.IntFlag{
Name: FlagShardIDWithAlias,
Usage: "shardID",
},
cli.StringFlag{
Name: FlagCluster,
Usage: "cluster the task processor is responsible for",
},
cli.IntFlag{
Name: FlagQueueType,
Usage: "queue type: 2 (transfer queue), 3 (timer queue) or 6 (cross-cluster queue)",
},
}
}
func newAdminFailoverCommands() []cli.Command {
return []cli.Command{
{
Name: "start",
Aliases: []string{"s"},
Usage: "start failover workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagTargetClusterWithAlias,
Usage: "Target cluster name",
},
cli.StringFlag{
Name: FlagSourceClusterWithAlias,
Usage: "Source cluster name",
},
cli.IntFlag{
Name: FlagFailoverTimeoutWithAlias,
Usage: "Optional graceful failover timeout in seconds. If this field is define, the failover will use graceful failover.",
},
cli.IntFlag{
Name: FlagExecutionTimeoutWithAlias,
Usage: "Optional Failover workflow timeout in seconds",
Value: defaultFailoverWorkflowTimeoutInSeconds,
},
cli.IntFlag{
Name: FlagFailoverWaitTimeWithAlias,
Usage: "Optional Failover wait time after each batch in seconds",
Value: defaultBatchFailoverWaitTimeInSeconds,
},
cli.IntFlag{
Name: FlagFailoverBatchSizeWithAlias,
Usage: "Optional number of domains to failover in one batch",
Value: defaultBatchFailoverSize,
},
cli.StringSliceFlag{
Name: FlagFailoverDomains,
Usage: "Optional domains to failover, eg d1,d2..,dn. " +
"Only provided domains in source cluster will be failover.",
},
cli.IntFlag{
Name: FlagFailoverDrillWaitTimeWithAlias,
Usage: "Optional failover drill wait time. " +
"After the wait time, the domains will be reset to original regions." +
"This field is required if the cron schedule is specified.",
},
cli.StringFlag{
Name: FlagCronSchedule,
Usage: "Optional cron schedule on failover drill. Please specify failover drill wait time " +
"if this field is specific",
},
},
Action: func(c *cli.Context) {
AdminFailoverStart(c)
},
},
{
Name: "pause",
Aliases: []string{"p"},
Usage: "pause failover workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "Optional Failover workflow runID, default is latest runID",
},
cli.BoolFlag{
Name: FlagFailoverDrillWithAlias,
Usage: "Optional to pause failover workflow or failover drill workflow." +
" The default is normal failover workflow",
},
},
Action: func(c *cli.Context) {
AdminFailoverPause(c)
},
},
{
Name: "resume",
Aliases: []string{"re"},
Usage: "resume paused failover workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "Optional Failover workflow runID, default is latest runID",
},
cli.BoolFlag{
Name: FlagFailoverDrillWithAlias,
Usage: "Optional to resume failover workflow or failover drill workflow." +
" The default is normal failover workflow",
},
},
Action: func(c *cli.Context) {
AdminFailoverResume(c)
},
},
{
Name: "query",
Aliases: []string{"q"},
Usage: "query failover workflow state",
Flags: []cli.Flag{
cli.BoolFlag{
Name: FlagFailoverDrillWithAlias,
Usage: "Optional to query failover workflow or failover drill workflow." +
" The default is normal failover workflow",
},
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "Optional Failover workflow runID, default is latest runID",
},
},
Action: func(c *cli.Context) {
AdminFailoverQuery(c)
},
},
{
Name: "abort",
Aliases: []string{"a"},
Usage: "abort failover workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "Optional Failover workflow runID, default is latest runID",
},
cli.StringFlag{
Name: FlagReasonWithAlias,
Usage: "Optional reason why abort",
},
cli.BoolFlag{
Name: FlagFailoverDrillWithAlias,
Usage: "Optional to abort failover workflow or failover drill workflow." +
" The default is normal failover workflow",
},
},
Action: func(c *cli.Context) {
AdminFailoverAbort(c)
},
},
{
Name: "rollback",
Aliases: []string{"ro"},
Usage: "rollback failover workflow",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagRunIDWithAlias,
Usage: "Optional Failover workflow runID, default is latest runID",
},
cli.IntFlag{
Name: FlagFailoverTimeoutWithAlias,
Usage: "Optional graceful failover timeout in seconds. If this field is define, the failover will use graceful failover.",
},
cli.IntFlag{
Name: FlagExecutionTimeoutWithAlias,
Usage: "Optional Failover workflow timeout in seconds",
Value: defaultFailoverWorkflowTimeoutInSeconds,
},
cli.IntFlag{
Name: FlagFailoverWaitTimeWithAlias,
Usage: "Optional Failover wait time after each batch in seconds",
Value: defaultBatchFailoverWaitTimeInSeconds,
},
cli.IntFlag{
Name: FlagFailoverBatchSizeWithAlias,
Usage: "Optional number of domains to failover in one batch",
Value: defaultBatchFailoverSize,
},
},
Action: func(c *cli.Context) {
AdminFailoverRollback(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "list failover workflow runs closed/open. This is just a simplified list cmd",
Flags: []cli.Flag{
cli.BoolFlag{
Name: FlagOpenWithAlias,
Usage: "List for open workflow executions, default is to list for closed ones",
},
cli.IntFlag{
Name: FlagPageSizeWithAlias,
Value: 10,
Usage: "Result page size",
},
cli.StringFlag{
Name: FlagWorkflowIDWithAlias,
Usage: "Ignore this. It is a dummy flag which will be forced overwrite",
},
cli.BoolFlag{
Name: FlagFailoverDrillWithAlias,
Usage: "Optional to query failover workflow or failover drill workflow." +
" The default is normal failover workflow",
},
},
Action: func(c *cli.Context) {
AdminFailoverList(c)
},
},
}
}
func newAdminRebalanceCommands() []cli.Command {
return []cli.Command{
{
Name: "start",
Aliases: []string{"s"},
Usage: "start rebalance workflow",
Flags: []cli.Flag{},
Action: func(c *cli.Context) {
AdminRebalanceStart(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "list rebalance workflow runs closed/open.",
Flags: []cli.Flag{
cli.BoolFlag{
Name: FlagOpenWithAlias,
Usage: "List for open workflow executions, default is to list for closed ones",
},
cli.IntFlag{
Name: FlagPageSizeWithAlias,
Value: 10,
Usage: "Result page size",
},
},
Action: func(c *cli.Context) {
AdminRebalanceList(c)
},
},
}
}
func newAdminConfigStoreCommands() []cli.Command {
return []cli.Command{
{
Name: "get",
Aliases: []string{"g"},
Usage: "Get Dynamic Config Value",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDynamicConfigName,
Usage: "Name of Dynamic Config parameter to get value of",
Required: true,
},
cli.StringSliceFlag{
Name: FlagDynamicConfigFilter,
Usage: fmt.Sprintf(`Optional. Can be specified multiple times for multiple filters. ex: --%s '{"Name":"domainName","Value":"global-samples-domain"}'`, FlagDynamicConfigFilter),
},
},
Action: func(c *cli.Context) {
AdminGetDynamicConfig(c)
},
},
{
Name: "update",
Aliases: []string{"u"},
Usage: "Update Dynamic Config Value",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDynamicConfigName,
Usage: "Name of Dynamic Config parameter to update value of",
Required: true,
},
cli.StringSliceFlag{
Name: FlagDynamicConfigValue,
Usage: fmt.Sprintf(`Can be specified multiple times for multiple values. ex: --%s '{"Value":true,"Filters":[]}'`, FlagDynamicConfigValue),
Required: true,
},
},
Action: func(c *cli.Context) {
AdminUpdateDynamicConfig(c)
},
},
{
Name: "restore",
Aliases: []string{"r"},
Usage: "Restore Dynamic Config Value",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDynamicConfigName,
Usage: "Name of Dynamic Config parameter to restore",
Required: true,
},
cli.StringSliceFlag{
Name: FlagDynamicConfigFilter,
Usage: fmt.Sprintf(`Optional. Can be specified multiple times for multiple filters. ex: --%s '{"Name":"domainName","Value":"global-samples-domain"}'`, FlagDynamicConfigFilter),
},
},
Action: func(c *cli.Context) {
AdminRestoreDynamicConfig(c)
},
},
{
Name: "list",
Aliases: []string{"l"},
Usage: "List Dynamic Config Value",
Flags: []cli.Flag{},
Action: func(c *cli.Context) {
AdminListDynamicConfig(c)
},
},
{
Name: "listall",
Aliases: []string{"la"},
Usage: "List all available configuration keys",
Flags: []cli.Flag{getFormatFlag()},
Action: func(c *cli.Context) {
AdminListConfigKeys(c)
},
},
}
}
func newAdminIsolationGroupCommands() []cli.Command {
return []cli.Command{
{
Name: "get-global",
Usage: "gets the global isolation groups",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagFormat,
Usage: `output format`,
},
},
Action: func(c *cli.Context) {
AdminGetGlobalIsolationGroups(c)
},
},
{
Name: "update-global",
Usage: "sets the global isolation groups",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagJSON,
Usage: `the configurations to upsert: eg: [{"Name": "zone-1": "State": 2}]. To remove groups, specify an empty configuration`,
Required: false,
},
cli.StringSliceFlag{
Name: FlagIsolationGroupSetDrains,
Usage: "Use to upsert the configuration for all drains. Note that this is an upsert operation and will overwrite all existing configuration",
Required: false,
},
cli.BoolFlag{
Name: FlagIsolationGroupsRemoveAllDrains,
Usage: "Removes all drains",
Required: false,
},
},
Action: func(c *cli.Context) {
AdminUpdateGlobalIsolationGroups(c)
},
},
{
Name: "get-domain",
Usage: "gets the domain isolation groups",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDomain,
Usage: `The domain to operate on`,
Required: true,
},
cli.StringFlag{
Name: FlagFormat,
Usage: `output format`,
},
},
Action: func(c *cli.Context) {
AdminGetDomainIsolationGroups(c)
},
},
{
Name: "update-domain",
Usage: "sets the domain isolation groups",
Flags: []cli.Flag{
cli.StringFlag{
Name: FlagDomain,
Usage: `The domain to operate on`,
Required: true,
},
cli.StringFlag{
Name: FlagJSON,
Usage: `the configurations to upsert: eg: [{"Name": "zone-1": "State": 2}]. To remove groups, specify an empty configuration`,
Required: false,
},
cli.StringSliceFlag{
Name: FlagIsolationGroupSetDrains,
Usage: "Use to upsert the configuration for all drains. Note that this is an upsert operation and will overwrite all existing configuration",
Required: false,
},
cli.BoolFlag{
Name: FlagIsolationGroupsRemoveAllDrains,
Usage: "Removes all drains",
Required: false,
},
},
Action: func(c *cli.Context) {
AdminUpdateDomainIsolationGroups(c)
},
},
}
}