tools/cli/domain_migration_command.go (429 lines of code) (raw):

// The MIT License (MIT) // Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package cli import ( "context" "encoding/json" "fmt" "reflect" "strconv" "strings" "sync" "time" "github.com/urfave/cli" "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common/dynamicconfig" dc "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/types" ) const ( // workflows running longer than 14 days are considered long running workflows longRunningDuration = 14 * 24 * time.Hour ) var ( domainMigrationTemplate = `Validation Check: {{- range .}} - {{.ValidationCheck}}: {{.ValidationResult}} {{- with .ValidationDetails}} {{- with .CurrentDomainRow}} Current Domain: Name: {{.DomainInfo.Name}} UUID: {{.DomainInfo.UUID}} {{- end}} {{- with .NewDomainRow}} New Domain: Name: {{.DomainInfo.Name}} UUID: {{.DomainInfo.UUID}} {{- end}} {{- if ne (len .MismatchedDomainMetaData) 0 }} Mismatched Domain Meta Data: {{.MismatchedDomainMetaData}} {{- end }} {{- if .LongRunningWorkFlowNum}} Long Running Workflow Num (> 14 days): {{.LongRunningWorkFlowNum}} {{- end}} {{- if .MissingCurrSearchAttributes}} Missing Search Attributes in Current Domain: {{- range .MissingCurrSearchAttributes}} - {{.}} {{- end}} {{- end}} {{- if .MissingNewSearchAttributes}} Missing Search Attributes in New Domain: {{- range .MissingNewSearchAttributes}} - {{.}} {{- end}} {{- end}} {{- range .MismatchedDynamicConfig}} {{- $dynamicConfig := . }} - Config Key: {{.Key}} {{- range $i, $v := .CurrValues}} Current Response: Data: {{ printf "%s" (index $dynamicConfig.CurrValues $i).Value.Data }} Filters: {{- range $filter := (index $dynamicConfig.CurrValues $i).Filters}} - Name: {{ $filter.Name }} Value: {{ printf "%s" $filter.Value.Data }} {{- end}} New Response: Data: {{ printf "%s" (index $dynamicConfig.NewValues $i).Value.Data }} Filters: {{- range $filter := (index $dynamicConfig.NewValues $i).Filters}} - Name: {{ $filter.Name }} Value: {{ printf "%s" $filter.Value.Data }} {{- end}} {{- end}} {{- end}} {{- end}} {{- end}} ` emptyGetDynamicConfigRequest = &types.GetDynamicConfigResponse{ Value: &types.DataBlob{ EncodingType: types.EncodingTypeJSON.Ptr(), }, } ) type DomainMigrationCommand interface { Validation(c *cli.Context) DomainMetaDataCheck(c *cli.Context) DomainMigrationRow DomainWorkFlowCheck(c *cli.Context) DomainMigrationRow SearchAttributesChecker(c *cli.Context) DomainMigrationRow DynamicConfigCheck(c *cli.Context) DomainMigrationRow } func (d *domainMigrationCLIImpl) NewDomainMigrationCLIImpl(c *cli.Context) *domainMigrationCLIImpl { return &domainMigrationCLIImpl{ frontendClient: cFactory.ServerFrontendClient(c), destinationClient: cFactory.ServerFrontendClientForMigration(c), frontendAdminClient: cFactory.ServerAdminClient(c), destinationAdminClient: cFactory.ServerAdminClientForMigration(c), } } // Export a function to create an instance of the domainMigrationCLIImpl. func NewDomainMigrationCommand(c *cli.Context) DomainMigrationCommand { return &domainMigrationCLIImpl{} } type domainMigrationCLIImpl struct { frontendClient, destinationClient frontend.Client frontendAdminClient, destinationAdminClient admin.Client } func (d *domainMigrationCLIImpl) Validation(c *cli.Context) { checkers := []func(*cli.Context) DomainMigrationRow{ d.DomainMetaDataCheck, d.DomainWorkFlowCheck, d.DynamicConfigCheck, d.SearchAttributesChecker, } wg := &sync.WaitGroup{} results := make([]DomainMigrationRow, len(checkers)) for i := range checkers { wg.Add(1) go func(i int) { defer wg.Done() results[i] = checkers[i](c) }(i) } wg.Wait() renderOpts := RenderOptions{ DefaultTemplate: domainMigrationTemplate, Color: true, Border: true, PrintDateTime: true, } if err := Render(c, results, renderOpts); err != nil { ErrorAndExit("Failed to render", err) } } func (d *domainMigrationCLIImpl) DomainMetaDataCheck(c *cli.Context) DomainMigrationRow { domain := c.GlobalString(FlagDomain) newDomain := c.String(FlagDestinationDomain) ctx, cancel := newContext(c) defer cancel() currResp, err := d.frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ Name: &domain, }) if err != nil { ErrorAndExit(fmt.Sprintf("Could not describe old domain, Please check to see if old domain exists before migrating."), err) } newResp, err := d.destinationClient.DescribeDomain(ctx, &types.DescribeDomainRequest{ Name: &newDomain, }) if err != nil { ErrorAndExit(fmt.Sprintf("Could not describe new domain, Please check to see if new domain exists before migrating."), err) } validationResult, mismatchedMetaData := metaDataValidation(currResp, newResp) validationRow := DomainMigrationRow{ ValidationCheck: "Domain Meta Data", ValidationResult: validationResult, ValidationDetails: ValidationDetails{ CurrentDomainRow: currResp, NewDomainRow: newResp, MismatchedDomainMetaData: mismatchedMetaData, }, } return validationRow } func metaDataValidation(currResp *types.DescribeDomainResponse, newResp *types.DescribeDomainResponse) (bool, string) { if !reflect.DeepEqual(currResp.Configuration, newResp.Configuration) { return false, "mismatched DomainConfiguration" } if currResp.DomainInfo.OwnerEmail != newResp.DomainInfo.OwnerEmail { return false, "mismatched OwnerEmail" } return true, "" } func (d *domainMigrationCLIImpl) DomainWorkFlowCheck(c *cli.Context) DomainMigrationRow { countWorkFlows := d.countLongRunningWorkflow(c) check := countWorkFlows == 0 return DomainMigrationRow{ ValidationCheck: "Workflow Check", ValidationResult: check, ValidationDetails: ValidationDetails{ LongRunningWorkFlowNum: &countWorkFlows, }, } } func (d *domainMigrationCLIImpl) countLongRunningWorkflow(c *cli.Context) int { domain := c.GlobalString(FlagDomain) thresholdOfLongRunning := time.Now().Add(-longRunningDuration) request := &types.CountWorkflowExecutionsRequest{ Domain: domain, Query: "CloseTime=missing AND StartTime < " + strconv.FormatInt(thresholdOfLongRunning.UnixNano(), 10), } ctx, cancel := newContextForLongPoll(c) defer cancel() response, err := d.frontendClient.CountWorkflowExecutions(ctx, request) if err != nil { ErrorAndExit("Failed to count workflow.", err) } return int(response.GetCount()) } func (d *domainMigrationCLIImpl) SearchAttributesChecker(c *cli.Context) DomainMigrationRow { ctx, cancel := newContext(c) defer cancel() // getting user provided search attributes searchAttributes := c.StringSlice(FlagSearchAttribute) if len(searchAttributes) == 0 { return DomainMigrationRow{ ValidationCheck: "Search Attributes Check", ValidationResult: true, } } // Parse the provided search attributes into a map[string]IndexValueType requiredAttributes := make(map[string]types.IndexedValueType) for _, attr := range searchAttributes { parts := strings.SplitN(attr, ":", 2) if len(parts) != 2 { ErrorAndExit(fmt.Sprintf("Invalid search attribute format: %s", attr), nil) } key, valueType := parts[0], parts[1] ivt, err := parseIndexedValueType(valueType) if err != nil { ErrorAndExit(fmt.Sprintf("Invalid search attribute type for %s: %s", key, valueType), err) } requiredAttributes[key] = ivt } // getting search attributes for current domain currentSearchAttributes, err := d.frontendClient.GetSearchAttributes(ctx) if err != nil { ErrorAndExit("Unable to get search attributes for current domain.", err) } // getting search attributes for new domain destinationSearchAttributes, err := d.destinationClient.GetSearchAttributes(ctx) if err != nil { ErrorAndExit("Unable to get search attributes for new domain.", err) } currentSearchAttrs := currentSearchAttributes.Keys destinationSearchAttrs := destinationSearchAttributes.Keys // checking to see if search attributes exist missingInCurrent := findMissingAttributes(requiredAttributes, currentSearchAttrs) missingInNew := findMissingAttributes(requiredAttributes, destinationSearchAttrs) validationResult := len(missingInCurrent) == 0 && len(missingInNew) == 0 validationRow := DomainMigrationRow{ ValidationCheck: "Search Attributes Check", ValidationResult: validationResult, ValidationDetails: ValidationDetails{ MissingCurrSearchAttributes: missingInCurrent, MissingNewSearchAttributes: missingInNew, }, } return validationRow } // helper to parse types.IndexedValueType from string func parseIndexedValueType(valueType string) (types.IndexedValueType, error) { var result types.IndexedValueType valueTypeBytes := []byte(valueType) if err := result.UnmarshalText(valueTypeBytes); err != nil { return 0, err } return result, nil } // finds missing attributed in a map of existing attributed based on required attributes func findMissingAttributes(requiredAttributes map[string]types.IndexedValueType, existingAttributes map[string]types.IndexedValueType) []string { missingAttributes := make([]string, 0) for key, requiredType := range requiredAttributes { existingType, ok := existingAttributes[key] if !ok || existingType != requiredType { // construct the key:type string format attr := fmt.Sprintf("%s:%s", key, requiredType) missingAttributes = append(missingAttributes, attr) } } return missingAttributes } func (d *domainMigrationCLIImpl) DynamicConfigCheck(c *cli.Context) DomainMigrationRow { var mismatchedConfigs []MismatchedDynamicConfig check := true resp := dynamicconfig.ListAllProductionKeys() currDomain := c.GlobalString(FlagDomain) newDomain := c.String(FlagDestinationDomain) ctx, cancel := newContext(c) defer cancel() currentDomainID := getDomainID(ctx, currDomain, d.frontendClient) destinationDomainID := getDomainID(ctx, newDomain, d.destinationClient) if currentDomainID == "" { ErrorAndExit("Failed to get domainID for the current domain.", nil) } if destinationDomainID == "" { ErrorAndExit("Failed to get domainID for the destination domain.", nil) } for _, configKey := range resp { if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainName { // Validate dynamic configs with only domainName filter currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainFilter(currDomain), }) newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainFilter(newDomain), }) currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) if err != nil { // empty to indicate N/A currResp = emptyGetDynamicConfigRequest } newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) if err != nil { // empty to indicate N/A newResp = emptyGetDynamicConfigRequest } if !reflect.DeepEqual(currResp.Value, newResp.Value) { check = false mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ Key: configKey, CurrValues: []*types.DynamicConfigValue{ toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainName: currDomain, }), }, NewValues: []*types.DynamicConfigValue{ toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainName: newDomain, }), }, }) } } else if len(configKey.Filters()) == 1 && configKey.Filters()[0] == dc.DomainID { // Validate dynamic configs with only domainID filter currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainIDFilter(currentDomainID), }) newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainIDFilter(destinationDomainID), }) currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) if err != nil { // empty to indicate N/A currResp = emptyGetDynamicConfigRequest } newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) if err != nil { // empty to indicate N/A newResp = emptyGetDynamicConfigRequest } if !reflect.DeepEqual(currResp.Value, newResp.Value) { check = false mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ Key: configKey, CurrValues: []*types.DynamicConfigValue{ toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainID: currentDomainID, }), }, NewValues: []*types.DynamicConfigValue{ toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainID: destinationDomainID, }), }, }) } } else if containsFilter(configKey, dc.DomainName.String()) && containsFilter(configKey, dc.TaskListName.String()) { // Validate dynamic configs with only domainName and TaskList filters taskLists := c.StringSlice(FlagTaskList) var mismatchedCurValues []*types.DynamicConfigValue var mismatchedNewValues []*types.DynamicConfigValue for _, taskList := range taskLists { currRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainFilter(currDomain), dynamicconfig.TaskListFilter(taskList), }) newRequest := dynamicconfig.ToGetDynamicConfigFilterRequest(configKey.String(), []dynamicconfig.FilterOption{ dynamicconfig.DomainFilter(newDomain), dynamicconfig.TaskListFilter(taskList), }) currResp, err := d.frontendAdminClient.GetDynamicConfig(ctx, currRequest) if err != nil { // empty to indicate N/A currResp = emptyGetDynamicConfigRequest } newResp, err := d.destinationAdminClient.GetDynamicConfig(ctx, newRequest) if err != nil { // empty to indicate N/A newResp = emptyGetDynamicConfigRequest } if !reflect.DeepEqual(currResp.Value, newResp.Value) { check = false mismatchedCurValues = append(mismatchedCurValues, toDynamicConfigValue(currResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainName: currDomain, dynamicconfig.TaskListName: taskLists, })) mismatchedNewValues = append(mismatchedNewValues, toDynamicConfigValue(newResp.Value, map[dc.Filter]interface{}{ dynamicconfig.DomainName: newDomain, dynamicconfig.TaskListName: taskLists, })) } } if len(mismatchedCurValues) > 0 && len(mismatchedNewValues) > 0 { mismatchedConfigs = append(mismatchedConfigs, MismatchedDynamicConfig{ Key: configKey, CurrValues: mismatchedCurValues, NewValues: mismatchedNewValues, }) } } } validationRow := DomainMigrationRow{ ValidationCheck: "Dynamic Config Check", ValidationResult: check, ValidationDetails: ValidationDetails{ MismatchedDynamicConfig: mismatchedConfigs, }, } return validationRow } func getDomainID(c context.Context, domain string, client frontend.Client) string { resp, err := client.DescribeDomain(c, &types.DescribeDomainRequest{Name: &domain}) if err != nil { ErrorAndExit("Failed to describe domain.", err) } return resp.DomainInfo.GetUUID() } func valueToDataBlob(value interface{}) *types.DataBlob { if value == nil { return nil } // No need to handle error as this is a private helper method // where the correct value will always be passed regardless data, _ := json.Marshal(value) return &types.DataBlob{ EncodingType: types.EncodingTypeJSON.Ptr(), Data: data, } } func toDynamicConfigValue(value *types.DataBlob, filterMaps map[dynamicconfig.Filter]interface{}) *types.DynamicConfigValue { var configFilters []*types.DynamicConfigFilter for filter, filterValue := range filterMaps { configFilters = append(configFilters, &types.DynamicConfigFilter{ Name: filter.String(), Value: valueToDataBlob(filterValue), }) } return &types.DynamicConfigValue{ Value: value, Filters: configFilters, } } func containsFilter(key dynamicconfig.Key, value string) bool { filters := key.Filters() for _, filter := range filters { if filter.String() == value { return true } } return false }