tools/cli/admin_kafka_commands.go (469 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package cli import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "os" "regexp" "sync/atomic" "time" "github.com/urfave/cli" "go.uber.org/thriftrw/protocol/binary" "go.uber.org/thriftrw/wire" "github.com/uber/cadence/.gen/go/indexer" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/client/admin" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" ) type ( filterFn func(*types.ReplicationTask) bool filterFnForVisibility func(*indexer.Message) bool kafkaMessageType int historyV2Task struct { Task *types.ReplicationTask Events []*types.HistoryEvent NewRunEvents []*types.HistoryEvent } ) const ( kafkaMessageTypeReplicationTask kafkaMessageType = iota kafkaMessageTypeVisibilityMsg ) const ( bufferSize = 8192 preambleVersion0 byte = 0x59 malformedMessage = "Input was malformed" chanBufferSize = 10000 maxRereplicateEventID = 999999 defaultResendContextTimeout = 30 * time.Second ) var ( r = regexp.MustCompile(`Partition: .*?, Offset: .*?, Key: .*?`) ) type writerChannel struct { Type kafkaMessageType ReplicationTaskChannel chan *types.ReplicationTask VisibilityMsgChannel chan *indexer.Message } func newWriterChannel(messageType kafkaMessageType) *writerChannel { ch := &writerChannel{ Type: messageType, } switch messageType { case kafkaMessageTypeReplicationTask: ch.ReplicationTaskChannel = make(chan *types.ReplicationTask, chanBufferSize) case kafkaMessageTypeVisibilityMsg: ch.VisibilityMsgChannel = make(chan *indexer.Message, chanBufferSize) } return ch } func (ch *writerChannel) Close() { if ch.ReplicationTaskChannel != nil { close(ch.ReplicationTaskChannel) } if ch.VisibilityMsgChannel != nil { close(ch.VisibilityMsgChannel) } } // AdminKafkaParse parses the output of k8read and outputs replication tasks func AdminKafkaParse(c *cli.Context) { inputFile := getInputFile(c.String(FlagInputFile)) outputFile := getOutputFile(c.String(FlagOutputFilename)) defer inputFile.Close() defer outputFile.Close() readerCh := make(chan []byte, chanBufferSize) writerCh := newWriterChannel(kafkaMessageType(c.Int(FlagMessageType))) doneCh := make(chan struct{}) serializer := persistence.NewPayloadSerializer() var skippedCount int32 skipErrMode := c.Bool(FlagSkipErrorMode) go startReader(inputFile, readerCh) go startParser(readerCh, writerCh, skipErrMode, &skippedCount) go startWriter(outputFile, writerCh, doneCh, &skippedCount, serializer, c) <-doneCh if skipErrMode { fmt.Printf("%v messages were skipped due to errors in parsing", atomic.LoadInt32(&skippedCount)) } } func buildFilterFn(workflowID, runID string) filterFn { return func(task *types.ReplicationTask) bool { if len(workflowID) != 0 || len(runID) != 0 { if task.GetHistoryTaskV2Attributes() == nil { return false } } if len(workflowID) != 0 && task.GetHistoryTaskV2Attributes().WorkflowID != workflowID { return false } if len(runID) != 0 && task.GetHistoryTaskV2Attributes().RunID != runID { return false } return true } } func buildFilterFnForVisibility(workflowID, runID string) filterFnForVisibility { return func(msg *indexer.Message) bool { if len(workflowID) != 0 && msg.GetWorkflowID() != workflowID { return false } if len(runID) != 0 && msg.GetRunID() != runID { return false } return true } } func getOutputFile(outputFile string) *os.File { if len(outputFile) == 0 { return os.Stdout } f, err := os.Create(outputFile) if err != nil { ErrorAndExit("failed to create output file", err) } return f } func startReader(file *os.File, readerCh chan<- []byte) { defer close(readerCh) reader := bufio.NewReader(file) for { buf := make([]byte, bufferSize) n, err := reader.Read(buf) if err != nil { if err != io.EOF { ErrorAndExit("Failed to read from reader", err) } else { break } } buf = buf[:n] readerCh <- buf } } func startParser(readerCh <-chan []byte, writerCh *writerChannel, skipErrors bool, skippedCount *int32) { defer writerCh.Close() var buffer []byte Loop: for { data, ok := <-readerCh if !ok { break Loop } buffer = append(buffer, data...) data, nextBuffer := splitBuffer(buffer) buffer = nextBuffer parse(data, skipErrors, skippedCount, writerCh) } parse(buffer, skipErrors, skippedCount, writerCh) } func startWriter( outputFile *os.File, writerCh *writerChannel, doneCh chan struct{}, skippedCount *int32, serializer persistence.PayloadSerializer, c *cli.Context, ) { defer close(doneCh) skipErrMode := c.Bool(FlagSkipErrorMode) headerMode := c.Bool(FlagHeadersMode) switch writerCh.Type { case kafkaMessageTypeReplicationTask: writeReplicationTask(outputFile, writerCh, skippedCount, skipErrMode, headerMode, serializer, c) case kafkaMessageTypeVisibilityMsg: writeVisibilityMessage(outputFile, writerCh, skippedCount, skipErrMode, headerMode, c) } } func writeReplicationTask( outputFile *os.File, writerCh *writerChannel, skippedCount *int32, skipErrMode bool, headerMode bool, serializer persistence.PayloadSerializer, c *cli.Context, ) { filter := buildFilterFn(c.String(FlagWorkflowID), c.String(FlagRunID)) Loop: for { task, ok := <-writerCh.ReplicationTaskChannel if !ok { break Loop } if filter(task) { jsonStr, err := decodeReplicationTask(task, serializer) if err != nil { if !skipErrMode { ErrorAndExit(malformedMessage, fmt.Errorf("failed to encode into json, err: %v", err)) } else { atomic.AddInt32(skippedCount, 1) continue Loop } } var outStr string if !headerMode { outStr = string(jsonStr) } else { outStr = fmt.Sprintf( "%v, %v, %v", task.GetHistoryTaskV2Attributes().DomainID, task.GetHistoryTaskV2Attributes().WorkflowID, task.GetHistoryTaskV2Attributes().RunID, ) } _, err = outputFile.WriteString(fmt.Sprintf("%v\n", outStr)) if err != nil { ErrorAndExit("Failed to write to file", fmt.Errorf("err: %v", err)) } } } } func writeVisibilityMessage( outputFile *os.File, writerCh *writerChannel, skippedCount *int32, skipErrMode bool, headerMode bool, c *cli.Context, ) { filter := buildFilterFnForVisibility(c.String(FlagWorkflowID), c.String(FlagRunID)) Loop: for { msg, ok := <-writerCh.VisibilityMsgChannel if !ok { break Loop } if filter(msg) { jsonStr, err := json.Marshal(msg) if err != nil { if !skipErrMode { ErrorAndExit(malformedMessage, fmt.Errorf("failed to encode into json, err: %v", err)) } else { atomic.AddInt32(skippedCount, 1) continue Loop } } var outStr string if !headerMode { outStr = string(jsonStr) } else { outStr = fmt.Sprintf( "%v, %v, %v, %v, %v", msg.GetDomainID(), msg.GetWorkflowID(), msg.GetRunID(), msg.GetMessageType().String(), msg.GetVersion(), ) } _, err = outputFile.WriteString(fmt.Sprintf("%v\n", outStr)) if err != nil { ErrorAndExit("Failed to write to file", fmt.Errorf("err: %v", err)) } } } } func splitBuffer(buffer []byte) ([]byte, []byte) { matches := r.FindAllIndex(buffer, -1) if len(matches) == 0 { ErrorAndExit(malformedMessage, errors.New("header not found, did you generate dump with -v")) } splitIndex := matches[len(matches)-1][0] return buffer[:splitIndex], buffer[splitIndex:] } func parse(bytes []byte, skipErrors bool, skippedCount *int32, writerCh *writerChannel) { messages, skippedGetMsgCount := getMessages(bytes, skipErrors) switch writerCh.Type { case kafkaMessageTypeReplicationTask: msgs, skippedDeserializeCount := deserializeMessages(messages, skipErrors) atomic.AddInt32(skippedCount, skippedGetMsgCount+skippedDeserializeCount) for _, msg := range msgs { writerCh.ReplicationTaskChannel <- msg } case kafkaMessageTypeVisibilityMsg: msgs, skippedDeserializeCount := deserializeVisibilityMessages(messages, skipErrors) atomic.AddInt32(skippedCount, skippedGetMsgCount+skippedDeserializeCount) for _, msg := range msgs { writerCh.VisibilityMsgChannel <- msg } } } func getMessages(data []byte, skipErrors bool) ([][]byte, int32) { str := string(data) messagesWithHeaders := r.Split(str, -1) if len(messagesWithHeaders[0]) != 0 { ErrorAndExit(malformedMessage, errors.New("got data chunk to handle that does not start with valid header")) } messagesWithHeaders = messagesWithHeaders[1:] var rawMessages [][]byte var skipped int32 for _, m := range messagesWithHeaders { if len(m) == 0 { ErrorAndExit(malformedMessage, errors.New("got empty message between valid headers")) } curr := []byte(m) messageStart := bytes.Index(curr, []byte{preambleVersion0}) if messageStart == -1 { if !skipErrors { ErrorAndExit(malformedMessage, errors.New("failed to find message preamble")) } else { skipped++ continue } } rawMessages = append(rawMessages, curr[messageStart:]) } return rawMessages, skipped } func deserializeMessages(messages [][]byte, skipErrors bool) ([]*types.ReplicationTask, int32) { var replicationTasks []*types.ReplicationTask var skipped int32 for _, m := range messages { var task replicator.ReplicationTask err := decode(m, &task) if err != nil { if !skipErrors { ErrorAndExit(malformedMessage, err) } else { skipped++ continue } } replicationTasks = append(replicationTasks, thrift.ToReplicationTask(&task)) } return replicationTasks, skipped } func decode(message []byte, val *replicator.ReplicationTask) error { reader := bytes.NewReader(message[1:]) wireVal, err := binary.Default.Decode(reader, wire.TStruct) if err != nil { return err } return val.FromWire(wireVal) } func deserializeVisibilityMessages(messages [][]byte, skipErrors bool) ([]*indexer.Message, int32) { var visibilityMessages []*indexer.Message var skipped int32 for _, m := range messages { var msg indexer.Message err := decodeVisibility(m, &msg) if err != nil { if !skipErrors { ErrorAndExit(malformedMessage, err) } else { skipped++ continue } } visibilityMessages = append(visibilityMessages, &msg) } return visibilityMessages, skipped } func decodeVisibility(message []byte, val *indexer.Message) error { reader := bytes.NewReader(message[1:]) wireVal, err := binary.Default.Decode(reader, wire.TStruct) if err != nil { return err } return val.FromWire(wireVal) } // ClustersConfig describes the kafka clusters type ClustersConfig struct { Clusters map[string]config.ClusterConfig TLS config.TLS } func doRereplicate( ctx context.Context, domainID string, wid string, rid string, endEventID *int64, endEventVersion *int64, sourceCluster string, adminClient admin.Client, ) { fmt.Printf("Start rereplication for wid: %v, rid:%v \n", wid, rid) if err := adminClient.ResendReplicationTasks( ctx, &types.ResendReplicationTasksRequest{ DomainID: domainID, WorkflowID: wid, RunID: rid, RemoteCluster: sourceCluster, EndEventID: endEventID, EndVersion: endEventVersion, }, ); err != nil { ErrorAndExit("Failed to resend ndc workflow", err) } fmt.Printf("Done rereplication for wid: %v, rid:%v \n", wid, rid) } // AdminRereplicate parses will re-publish replication tasks to topic func AdminRereplicate(c *cli.Context) { sourceCluster := getRequiredOption(c, FlagSourceCluster) adminClient := cFactory.ServerAdminClient(c) var endEventID, endVersion *int64 if c.IsSet(FlagMaxEventID) { endEventID = common.Int64Ptr(c.Int64(FlagMaxEventID) + 1) } if c.IsSet(FlagEndEventVersion) { endVersion = common.Int64Ptr(c.Int64(FlagEndEventVersion)) } domainID := getRequiredOption(c, FlagDomainID) wid := getRequiredOption(c, FlagWorkflowID) rid := getRequiredOption(c, FlagRunID) contextTimeout := defaultResendContextTimeout if c.GlobalIsSet(FlagContextTimeout) { contextTimeout = time.Duration(c.GlobalInt(FlagContextTimeout)) * time.Second } ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() doRereplicate( ctx, domainID, wid, rid, endEventID, endVersion, sourceCluster, adminClient, ) } func decodeReplicationTask( task *types.ReplicationTask, serializer persistence.PayloadSerializer, ) ([]byte, error) { switch task.GetTaskType() { case types.ReplicationTaskTypeHistoryV2: historyV2 := task.GetHistoryTaskV2Attributes() events, err := serializer.DeserializeBatchEvents( persistence.NewDataBlobFromInternal(historyV2.Events), ) if err != nil { return nil, err } var newRunEvents []*types.HistoryEvent if historyV2.NewRunEvents != nil { newRunEvents, err = serializer.DeserializeBatchEvents( persistence.NewDataBlobFromInternal(historyV2.NewRunEvents), ) if err != nil { return nil, err } } historyV2.Events = nil historyV2.NewRunEvents = nil historyV2Attributes := &historyV2Task{ Task: task, Events: events, NewRunEvents: newRunEvents, } return json.Marshal(historyV2Attributes) default: return json.Marshal(task) } }