tools/cli/admin_commands.go (501 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cli import ( "encoding/json" "fmt" "io/ioutil" "strconv" "time" "github.com/urfave/cli" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) const ( tableRenderSize = 10 ) // AdminShowWorkflow shows history func AdminShowWorkflow(c *cli.Context) { tid := c.String(FlagTreeID) bid := c.String(FlagBranchID) sid := c.Int(FlagShardID) minEventID := c.Int64(FlagMinEventID) maxEventID := c.Int64(FlagMaxEventID) outputFileName := c.String(FlagOutputFilename) domainName := c.String(FlagDomain) ctx, cancel := newContext(c) defer cancel() serializer := persistence.NewPayloadSerializer() var history []*persistence.DataBlob if len(tid) != 0 { thriftrwEncoder := codec.NewThriftRWEncoder() histV2 := initializeHistoryManager(c) branchToken, err := thriftrwEncoder.Encode(&shared.HistoryBranch{ TreeID: &tid, BranchID: &bid, }) if err != nil { ErrorAndExit("encoding branch token err", err) } resp, err := histV2.ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{ BranchToken: branchToken, MinEventID: minEventID, MaxEventID: maxEventID, PageSize: int(maxEventID - minEventID + 1), ShardID: &sid, DomainName: domainName, }) if err != nil { ErrorAndExit("ReadHistoryBranch err", err) } history = resp.HistoryEventBlobs } else { ErrorAndExit("need to specify TreeID/BranchID/ShardID", nil) } if len(history) == 0 { ErrorAndExit("no events", nil) } allEvents := &shared.History{} totalSize := 0 for idx, b := range history { totalSize += len(b.Data) fmt.Printf("======== batch %v, blob len: %v ======\n", idx+1, len(b.Data)) internalHistoryBatch, err := serializer.DeserializeBatchEvents(b) if err != nil { ErrorAndExit("DeserializeBatchEvents err", err) } historyBatch := thrift.FromHistoryEventArray(internalHistoryBatch) allEvents.Events = append(allEvents.Events, historyBatch...) for _, e := range historyBatch { jsonstr, err := json.Marshal(e) if err != nil { ErrorAndExit("json.Marshal err", err) } fmt.Println(string(jsonstr)) } } fmt.Printf("======== total batches %v, total blob len: %v ======\n", len(history), totalSize) if outputFileName != "" { data, err := json.Marshal(allEvents.Events) if err != nil { ErrorAndExit("Failed to serialize history data.", err) } if err := ioutil.WriteFile(outputFileName, data, 0666); err != nil { ErrorAndExit("Failed to export history data file.", err) } } } // AdminDescribeWorkflow describe a new workflow execution for admin func AdminDescribeWorkflow(c *cli.Context) { resp := describeMutableState(c) prettyPrintJSONObject(resp) if resp != nil { msStr := resp.GetMutableStateInDatabase() ms := persistence.WorkflowMutableState{} err := json.Unmarshal([]byte(msStr), &ms) if err != nil { ErrorAndExit("json.Unmarshal err", err) } currentBranchToken := ms.ExecutionInfo.BranchToken if ms.VersionHistories != nil { // if VersionHistories is set, then all branch infos are stored in VersionHistories currentVersionHistory, err := ms.VersionHistories.GetCurrentVersionHistory() if err != nil { ErrorAndExit("ms.VersionHistories.GetCurrentVersionHistory err", err) } currentBranchToken = currentVersionHistory.GetBranchToken() } branchInfo := shared.HistoryBranch{} thriftrwEncoder := codec.NewThriftRWEncoder() err = thriftrwEncoder.Decode(currentBranchToken, &branchInfo) if err != nil { ErrorAndExit("thriftrwEncoder.Decode err", err) } prettyPrintJSONObject(branchInfo) if ms.ExecutionInfo.AutoResetPoints != nil { fmt.Println("auto-reset-points:") for _, p := range ms.ExecutionInfo.AutoResetPoints.Points { createT := time.Unix(0, p.GetCreatedTimeNano()) expireT := time.Unix(0, p.GetExpiringTimeNano()) fmt.Println(p.GetBinaryChecksum(), p.GetRunID(), p.GetFirstDecisionCompletedID(), p.GetResettable(), createT, expireT) } } } } func describeMutableState(c *cli.Context) *types.AdminDescribeWorkflowExecutionResponse { adminClient := cFactory.ServerAdminClient(c) domain := getRequiredGlobalOption(c, FlagDomain) wid := getRequiredOption(c, FlagWorkflowID) rid := c.String(FlagRunID) ctx, cancel := newContext(c) defer cancel() resp, err := adminClient.DescribeWorkflowExecution( ctx, &types.AdminDescribeWorkflowExecutionRequest{ Domain: domain, Execution: &types.WorkflowExecution{ WorkflowID: wid, RunID: rid, }, }, ) if err != nil { ErrorAndExit("Get workflow mutableState failed", err) } return resp } // AdminMaintainCorruptWorkflow deletes workflow from DB if it's corrupt func AdminMaintainCorruptWorkflow(c *cli.Context) error { domainName := getRequiredGlobalOption(c, FlagDomain) workflowID := c.String(FlagWorkflowID) runID := c.String(FlagRunID) skipErrors := c.Bool(FlagSkipErrorMode) adminClient := cFactory.ServerAdminClient(c) request := &types.AdminMaintainWorkflowRequest{ Domain: domainName, Execution: &types.WorkflowExecution{ WorkflowID: workflowID, RunID: runID, }, SkipErrors: skipErrors, } ctx, cancel := newContext(c) defer cancel() _, err := adminClient.MaintainCorruptWorkflow(ctx, request) if err != nil { ErrorAndExit("Operation AdminMaintainCorruptWorkflow failed.", err) } return err } // AdminDeleteWorkflow delete a workflow execution for admin func AdminDeleteWorkflow(c *cli.Context) { domain := getRequiredGlobalOption(c, FlagDomain) wid := getRequiredOption(c, FlagWorkflowID) rid := c.String(FlagRunID) remote := c.Bool(FlagRemote) skipError := c.Bool(FlagSkipErrorMode) ctx, cancel := newContext(c) defer cancel() // With remote flag, we run the command on the server side using existing APIs // Without remote, commands are run directly through some DB clients. This is // useful if server is down somehow. However, we only support couple DB clients // currently. If the server side hosts working, remote is a cleaner approach if remote { adminClient := cFactory.ServerAdminClient(c) request := &types.AdminDeleteWorkflowRequest{ Domain: domain, Execution: &types.WorkflowExecution{ WorkflowID: wid, RunID: rid, }, SkipErrors: skipError, } _, err := adminClient.DeleteWorkflow(ctx, request) if err != nil { ErrorAndExit("Operation AdminMaintainCorruptWorkflow failed.", err) } return } resp := describeMutableState(c) msStr := resp.GetMutableStateInDatabase() ms := persistence.WorkflowMutableState{} err := json.Unmarshal([]byte(msStr), &ms) if err != nil { ErrorAndExit("json.Unmarshal err", err) } domainID := ms.ExecutionInfo.DomainID shardID := resp.GetShardID() shardIDInt, err := strconv.Atoi(shardID) if err != nil { ErrorAndExit("strconv.Atoi(shardID) err", err) } histV2 := initializeHistoryManager(c) defer histV2.Close() exeStore := initializeExecutionStore(c, shardIDInt) branchInfo := shared.HistoryBranch{} thriftrwEncoder := codec.NewThriftRWEncoder() branchTokens := [][]byte{ms.ExecutionInfo.BranchToken} if ms.VersionHistories != nil { // if VersionHistories is set, then all branch infos are stored in VersionHistories branchTokens = [][]byte{} for _, versionHistory := range ms.VersionHistories.ToInternalType().Histories { branchTokens = append(branchTokens, versionHistory.BranchToken) } } for _, branchToken := range branchTokens { err = thriftrwEncoder.Decode(branchToken, &branchInfo) if err != nil { ErrorAndExit("thriftrwEncoder.Decode err", err) } fmt.Println("deleting history events for ...") prettyPrintJSONObject(branchInfo) err = histV2.DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken, ShardID: &shardIDInt, DomainName: domain, }) if err != nil { if skipError { fmt.Println("failed to delete history, ", err) } else { ErrorAndExit("DeleteHistoryBranch err", err) } } } req := &persistence.DeleteWorkflowExecutionRequest{ DomainID: domainID, WorkflowID: wid, RunID: rid, DomainName: domain, } err = exeStore.DeleteWorkflowExecution(ctx, req) if err != nil { if skipError { fmt.Println("delete mutableState row failed, ", err) } else { ErrorAndExit("delete mutableState row failed", err) } } fmt.Println("delete mutableState row successfully") deleteCurrentReq := &persistence.DeleteCurrentWorkflowExecutionRequest{ DomainID: domainID, WorkflowID: wid, RunID: rid, } err = exeStore.DeleteCurrentWorkflowExecution(ctx, deleteCurrentReq) if err != nil { if skipError { fmt.Println("delete current row failed, ", err) } else { ErrorAndExit("delete current row failed", err) } } fmt.Println("delete current row successfully") } // AdminGetDomainIDOrName map domain func AdminGetDomainIDOrName(c *cli.Context) { domainID := c.String(FlagDomainID) domainName := c.String(FlagDomain) if len(domainID) == 0 && len(domainName) == 0 { ErrorAndExit("Need either domainName or domainID", nil) } domainManager := initializeDomainManager(c) ctx, cancel := newContext(c) defer cancel() if len(domainID) > 0 { domain, err := domainManager.GetDomain(ctx, &persistence.GetDomainRequest{ID: domainID}) if err != nil { ErrorAndExit("SelectDomain error", err) } fmt.Printf("domainName for domainID %v is %v \n", domainID, domain.Info.Name) } else { domain, err := domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: domainName}) if err != nil { ErrorAndExit("SelectDomain error", err) } fmt.Printf("domainID for domainName %v is %v \n", domain.Info.ID, domainID) } } // AdminGetShardID get shardID func AdminGetShardID(c *cli.Context) { wid := getRequiredOption(c, FlagWorkflowID) numberOfShards := c.Int(FlagNumberOfShards) if numberOfShards <= 0 { ErrorAndExit("numberOfShards is required", nil) return } shardID := common.WorkflowIDToHistoryShard(wid, numberOfShards) fmt.Printf("ShardID for workflowID: %v is %v \n", wid, shardID) } // AdminRemoveTask describes history host func AdminRemoveTask(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) shardID := getRequiredIntOption(c, FlagShardID) taskID := getRequiredInt64Option(c, FlagTaskID) typeID := getRequiredIntOption(c, FlagTaskType) var visibilityTimestamp int64 if common.TaskType(typeID) == common.TaskTypeTimer { visibilityTimestamp = getRequiredInt64Option(c, FlagTaskVisibilityTimestamp) } var clusterName string if common.TaskType(taskID) == common.TaskTypeCrossCluster { clusterName = getRequiredOption(c, FlagCluster) } ctx, cancel := newContext(c) defer cancel() req := &types.RemoveTaskRequest{ ShardID: int32(shardID), Type: common.Int32Ptr(int32(typeID)), TaskID: taskID, VisibilityTimestamp: common.Int64Ptr(visibilityTimestamp), ClusterName: clusterName, } err := adminClient.RemoveTask(ctx, req) if err != nil { ErrorAndExit("Remove task has failed", err) } } // AdminDescribeShard describes shard by shard id func AdminDescribeShard(c *cli.Context) { sid := getRequiredIntOption(c, FlagShardID) ctx, cancel := newContext(c) defer cancel() shardManager := initializeShardManager(c) getShardReq := &persistence.GetShardRequest{ShardID: sid} shard, err := shardManager.GetShard(ctx, getShardReq) if err != nil { ErrorAndExit("Failed to describe shard.", err) } prettyPrintJSONObject(shard) } // AdminSetShardRangeID set shard rangeID by shard id func AdminSetShardRangeID(c *cli.Context) { sid := getRequiredIntOption(c, FlagShardID) rid := getRequiredInt64Option(c, FlagRangeID) ctx, cancel := newContext(c) defer cancel() shardManager := initializeShardManager(c) getShardResp, err := shardManager.GetShard(ctx, &persistence.GetShardRequest{ShardID: sid}) if err != nil { ErrorAndExit("Failed to get shardInfo.", err) } previousRangeID := getShardResp.ShardInfo.RangeID updatedShardInfo := getShardResp.ShardInfo updatedShardInfo.RangeID = rid updatedShardInfo.StolenSinceRenew++ updatedShardInfo.Owner = "" updatedShardInfo.UpdatedAt = time.Now() if err := shardManager.UpdateShard(ctx, &persistence.UpdateShardRequest{ PreviousRangeID: previousRangeID, ShardInfo: updatedShardInfo, }); err != nil { ErrorAndExit("Failed to reset shard rangeID.", err) } fmt.Printf("Successfully updated rangeID from %v to %v for shard %v.\n", previousRangeID, rid, sid) } // AdminCloseShard closes shard by shard id func AdminCloseShard(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) sid := getRequiredIntOption(c, FlagShardID) ctx, cancel := newContext(c) defer cancel() req := &types.CloseShardRequest{} req.ShardID = int32(sid) err := adminClient.CloseShard(ctx, req) if err != nil { ErrorAndExit("Close shard task has failed", err) } } type ShardRow struct { ShardID int32 `header:"ShardID"` Identity string `header:"Identity"` } // AdminDescribeShardDistribution describes shard distribution func AdminDescribeShardDistribution(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) ctx, cancel := newContext(c) defer cancel() req := &types.DescribeShardDistributionRequest{ PageSize: int32(c.Int(FlagPageSize)), PageID: int32(c.Int(FlagPageID)), } resp, err := adminClient.DescribeShardDistribution(ctx, req) if err != nil { ErrorAndExit("Shard list failed", err) } fmt.Printf("Total Number of Shards: %d \n", resp.NumberOfShards) fmt.Printf("Number of Shards Returned: %d \n", len(resp.Shards)) if len(resp.Shards) == 0 { return } table := []ShardRow{} opts := RenderOptions{DefaultTemplate: templateTable, Color: true} outputPageSize := tableRenderSize for shardID, identity := range resp.Shards { if outputPageSize == 0 { Render(c, table, opts) table = []ShardRow{} if !showNextPage() { break } outputPageSize = tableRenderSize } table = append(table, ShardRow{ShardID: shardID, Identity: identity}) outputPageSize-- } // output the remaining rows Render(c, table, opts) } // AdminDescribeHistoryHost describes history host func AdminDescribeHistoryHost(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) wid := c.String(FlagWorkflowID) sid := c.Int(FlagShardID) addr := c.String(FlagHistoryAddress) printFully := c.Bool(FlagPrintFullyDetail) if len(wid) == 0 && !c.IsSet(FlagShardID) && len(addr) == 0 { ErrorAndExit("at least one of them is required to provide to lookup host: workflowID, shardID and host address", nil) return } ctx, cancel := newContext(c) defer cancel() req := &types.DescribeHistoryHostRequest{} if len(wid) > 0 { req.ExecutionForHost = &types.WorkflowExecution{WorkflowID: wid} } if c.IsSet(FlagShardID) { req.ShardIDForHost = common.Int32Ptr(int32(sid)) } if len(addr) > 0 { req.HostAddress = common.StringPtr(addr) } resp, err := adminClient.DescribeHistoryHost(ctx, req) if err != nil { ErrorAndExit("Describe history host failed", err) } if !printFully { resp.ShardIDs = nil } prettyPrintJSONObject(resp) } // AdminRefreshWorkflowTasks refreshes all the tasks of a workflow func AdminRefreshWorkflowTasks(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) domain := getRequiredGlobalOption(c, FlagDomain) wid := getRequiredOption(c, FlagWorkflowID) rid := c.String(FlagRunID) ctx, cancel := newContext(c) defer cancel() err := adminClient.RefreshWorkflowTasks(ctx, &types.RefreshWorkflowTasksRequest{ Domain: domain, Execution: &types.WorkflowExecution{ WorkflowID: wid, RunID: rid, }, }) if err != nil { ErrorAndExit("Refresh workflow task failed", err) } else { fmt.Println("Refresh workflow task succeeded.") } } // AdminResetQueue resets task processing queue states func AdminResetQueue(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) shardID := getRequiredIntOption(c, FlagShardID) clusterName := getRequiredOption(c, FlagCluster) typeID := getRequiredIntOption(c, FlagQueueType) ctx, cancel := newContext(c) defer cancel() req := &types.ResetQueueRequest{ ShardID: int32(shardID), ClusterName: clusterName, Type: common.Int32Ptr(int32(typeID)), } err := adminClient.ResetQueue(ctx, req) if err != nil { ErrorAndExit("Failed to reset queue", err) } fmt.Println("Reset queue state succeeded") } // AdminDescribeQueue describes task processing queue states func AdminDescribeQueue(c *cli.Context) { adminClient := cFactory.ServerAdminClient(c) shardID := getRequiredIntOption(c, FlagShardID) clusterName := getRequiredOption(c, FlagCluster) typeID := getRequiredIntOption(c, FlagQueueType) ctx, cancel := newContext(c) defer cancel() req := &types.DescribeQueueRequest{ ShardID: int32(shardID), ClusterName: clusterName, Type: common.Int32Ptr(int32(typeID)), } resp, err := adminClient.DescribeQueue(ctx, req) if err != nil { ErrorAndExit("Failed to describe queue", err) } for _, state := range resp.ProcessingQueueStates { fmt.Println(state) } }