tools/cli/admin_dlq_commands.go (306 lines of code) (raw):

// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cli import ( "bufio" "fmt" "io" "os" "sort" "strconv" "strings" "time" "github.com/urfave/cli" "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) const ( defaultPageSize = 1000 ) type DLQRow struct { ShardID int `header:"Shard ID" json:"shardID"` DomainName string `header:"Domain Name" json:"domainName"` DomainID string `header:"Domain ID" json:"domainID"` WorkflowID string `header:"Workflow ID" json:"workflowID"` RunID string `header:"Run ID" json:"runID"` TaskID int64 `header:"Task ID" json:"taskID"` TaskType *types.ReplicationTaskType `header:"Task Type" json:"taskType"` Version int64 `json:"version"` FirstEventID int64 `json:"firstEventID"` NextEventID int64 `json:"nextEventID"` ScheduledID int64 `json:"scheduledID"` ReplicationTask *types.ReplicationTask `json:"replicationTask"` // Those are deserialized variants from history replications task Events []*types.HistoryEvent `json:"events"` NewRunEvents []*types.HistoryEvent `json:"newRunEvents,omitempty"` // Only event IDs for compact table representation EventIDs []int64 `header:"Event IDs"` NewRunEventIDs []int64 `header:"New Run Event IDs"` } type HistoryDLQCountRow struct { SourceCluster string `header:"Source Cluster" json:"sourceCluster"` ShardID int32 `header:"Shard ID" json:"shardID"` Count int64 `header:"Count" json:"count"` } // AdminCountDLQMessages returns info how many and where DLQ messages are queued func AdminCountDLQMessages(c *cli.Context) { force := c.Bool(FlagForce) ctx, cancel := newContext(c) defer cancel() adminClient := cFactory.ServerAdminClient(c) response, err := adminClient.CountDLQMessages(ctx, &types.CountDLQMessagesRequest{ForceFetch: force}) if err != nil { fmt.Fprintf(os.Stderr, "Error occurred while getting DLQ count, results may be partial: %v\n", err) } if c.String(FlagDLQType) == "domain" { fmt.Println(response.Domain) return } table := []HistoryDLQCountRow{} for key, count := range response.History { table = append(table, HistoryDLQCountRow{ SourceCluster: key.SourceCluster, ShardID: key.ShardID, Count: count, }) } sort.Slice(table, func(i, j int) bool { // First sort by source cluster switch strings.Compare(table[i].SourceCluster, table[j].SourceCluster) { case -1: return true case 1: return false } // Then by count in decreasing order diff := table[i].Count - table[j].Count if diff > 0 { return true } if diff < 0 { return false } // Finally by shard in increasing order return table[i].ShardID < table[j].ShardID }) Render(c, table, RenderOptions{Color: true, DefaultTemplate: templateTable}) } // AdminGetDLQMessages gets DLQ metadata func AdminGetDLQMessages(c *cli.Context) { ctx, cancel := newContext(c) defer cancel() client := cFactory.ServerFrontendClient(c) adminClient := cFactory.ServerAdminClient(c) dlqType := toQueueType(getRequiredOption(c, FlagDLQType)) sourceCluster := getRequiredOption(c, FlagSourceCluster) remainingMessageCount := common.EndMessageID if c.IsSet(FlagMaxMessageCount) { remainingMessageCount = c.Int64(FlagMaxMessageCount) } lastMessageID := common.EndMessageID if c.IsSet(FlagLastMessageID) { lastMessageID = c.Int64(FlagLastMessageID) } // Cache for domain names domainNames := map[string]string{} getDomainName := func(domainId string) string { if domainName, ok := domainNames[domainId]; ok { return domainName } resp, err := client.DescribeDomain(ctx, &types.DescribeDomainRequest{UUID: common.StringPtr(domainId)}) if err != nil { ErrorAndExit("failed to describe domain", err) } domainNames[domainId] = resp.DomainInfo.Name return resp.DomainInfo.Name } readShard := func(shardID int) []DLQRow { var rows []DLQRow var pageToken []byte for { resp, err := adminClient.ReadDLQMessages(ctx, &types.ReadDLQMessagesRequest{ Type: dlqType, SourceCluster: sourceCluster, ShardID: int32(shardID), InclusiveEndMessageID: common.Int64Ptr(lastMessageID), MaximumPageSize: defaultPageSize, NextPageToken: pageToken, }) if err != nil { ErrorAndExit(fmt.Sprintf("fail to read dlq message for shard: %d", shardID), err) } replicationTasks := map[int64]*types.ReplicationTask{} for _, task := range resp.ReplicationTasks { replicationTasks[task.SourceTaskID] = task } for _, info := range resp.ReplicationTasksInfo { task := replicationTasks[info.TaskID] var taskType *types.ReplicationTaskType if task != nil { taskType = task.TaskType } events := deserializeBatchEvents(task.GetHistoryTaskV2Attributes().GetEvents()) newRunEvents := deserializeBatchEvents(task.GetHistoryTaskV2Attributes().GetNewRunEvents()) rows = append(rows, DLQRow{ ShardID: shardID, DomainName: getDomainName(info.DomainID), DomainID: info.DomainID, WorkflowID: info.WorkflowID, RunID: info.RunID, TaskType: taskType, TaskID: info.TaskID, Version: info.Version, FirstEventID: info.FirstEventID, NextEventID: info.NextEventID, ScheduledID: info.ScheduledID, ReplicationTask: task, Events: events, EventIDs: collectEventIDs(events), NewRunEvents: newRunEvents, NewRunEventIDs: collectEventIDs(newRunEvents), }) remainingMessageCount-- if remainingMessageCount <= 0 { return rows } } if len(resp.NextPageToken) == 0 { break } pageToken = resp.NextPageToken } return rows } table := []DLQRow{} for shardID := range getShards(c) { if remainingMessageCount <= 0 { break } table = append(table, readShard(shardID)...) } Render(c, table, RenderOptions{DefaultTemplate: templateTable, Color: true}) } // AdminPurgeDLQMessages deletes messages from DLQ func AdminPurgeDLQMessages(c *cli.Context) { dlqType := getRequiredOption(c, FlagDLQType) sourceCluster := getRequiredOption(c, FlagSourceCluster) var lastMessageID *int64 if c.IsSet(FlagLastMessageID) { lastMessageID = common.Int64Ptr(c.Int64(FlagLastMessageID)) } adminClient := cFactory.ServerAdminClient(c) for shardID := range getShards(c) { ctx, cancel := newContext(c) err := adminClient.PurgeDLQMessages(ctx, &types.PurgeDLQMessagesRequest{ Type: toQueueType(dlqType), SourceCluster: sourceCluster, ShardID: int32(shardID), InclusiveEndMessageID: lastMessageID, }) cancel() if err != nil { fmt.Printf("Failed to purge DLQ message in shard %v with error: %v.\n", shardID, err) continue } time.Sleep(10 * time.Millisecond) fmt.Printf("Successfully purge DLQ Messages in shard %v.\n", shardID) } } // AdminMergeDLQMessages merges message from DLQ func AdminMergeDLQMessages(c *cli.Context) { dlqType := getRequiredOption(c, FlagDLQType) sourceCluster := getRequiredOption(c, FlagSourceCluster) var lastMessageID *int64 if c.IsSet(FlagLastMessageID) { lastMessageID = common.Int64Ptr(c.Int64(FlagLastMessageID)) } adminClient := cFactory.ServerAdminClient(c) ShardIDLoop: for shardID := range getShards(c) { request := &types.MergeDLQMessagesRequest{ Type: toQueueType(dlqType), SourceCluster: sourceCluster, ShardID: int32(shardID), InclusiveEndMessageID: lastMessageID, MaximumPageSize: defaultPageSize, } for { ctx, cancel := newContext(c) response, err := adminClient.MergeDLQMessages(ctx, request) cancel() if err != nil { fmt.Printf("Failed to merge DLQ message in shard %v with error: %v.\n", shardID, err) continue ShardIDLoop } if len(response.NextPageToken) == 0 { break } request.NextPageToken = response.NextPageToken } fmt.Printf("Successfully merged all messages in shard %v.\n", shardID) } } func getShards(c *cli.Context) chan int { // Check if we have stdin available stat, err := os.Stdin.Stat() if err == nil && (stat.Mode()&os.ModeCharDevice) == 0 { return readShardsFromStdin() } return generateShardRangeFromFlags(c) } func generateShardRangeFromFlags(c *cli.Context) chan int { shards := make(chan int) go func() { shardRange, err := parseIntMultiRange(c.String(FlagShards)) if err != nil { fmt.Printf("failed to parse shard range: %q\n", c.String(FlagShards)) } else { for _, shard := range shardRange { shards <- shard } } close(shards) }() return shards } func readShardsFromStdin() chan int { shards := make(chan int) go func() { reader := bufio.NewReader(os.Stdin) for { line, err := reader.ReadString('\n') if err == io.EOF { break } if err != nil { fmt.Printf("Unable to read from stdin: %v", err) continue } shard, err := strconv.ParseInt(strings.TrimSpace(line), 10, 64) if err != nil { fmt.Printf("Failed to parse shard id: %q\n", line) continue } shards <- int(shard) } close(shards) }() return shards } func toQueueType(dlqType string) *types.DLQType { switch dlqType { case "domain": return types.DLQTypeDomain.Ptr() case "history": return types.DLQTypeReplication.Ptr() default: ErrorAndExit("The queue type is not supported.", fmt.Errorf("the queue type is not supported. Type: %v", dlqType)) } return nil } func deserializeBatchEvents(blob *types.DataBlob) []*types.HistoryEvent { if blob == nil { return nil } serializer := persistence.NewPayloadSerializer() events, err := serializer.DeserializeBatchEvents(persistence.NewDataBlobFromInternal(blob)) if err != nil { ErrorAndExit("Failed to decode DLQ history replication events", err) } return events } func collectEventIDs(events []*types.HistoryEvent) []int64 { ids := make([]int64, 0, len(events)) for _, event := range events { ids = append(ids, event.ID) } return ids }