func ResetInBatch()

in tools/cli/workflow_commands.go [1509:1640]


func ResetInBatch(c *cli.Context) {
	domain := getRequiredGlobalOption(c, FlagDomain)
	resetType := getRequiredOption(c, FlagResetType)
	decisionOffset := c.Int(FlagDecisionOffset)
	if decisionOffset > 0 {
		ErrorAndExit("Only decision offset <=0 is supported", nil)
	}

	inFileName := c.String(FlagInputFile)
	query := c.String(FlagListQuery)
	excludeFileName := c.String(FlagExcludeFile)
	excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
	separator := c.String(FlagInputSeparator)
	parallel := c.Int(FlagParallismDeprecated)
	if parallel == 1 {
		parallel = c.Int(FlagParallelism)
	}

	extraForResetType, ok := resetTypesMap[resetType]
	if !ok {
		ErrorAndExit("Not supported reset type", nil)
	} else if len(extraForResetType) > 0 {
		getRequiredOption(c, extraForResetType)
	}

	if excludeFileName != "" && excludeQuery != "" {
		ErrorAndExit("Only one of the excluding option is allowed", nil)
	}

	batchResetParams := batchResetParamsType{
		reason:               getRequiredOption(c, FlagReason),
		skipCurrentOpen:      c.Bool(FlagSkipCurrentOpen),
		skipCurrentCompleted: c.Bool(FlagSkipCurrentCompleted),
		nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
		skipBaseNotCurrent:   c.Bool(FlagSkipBaseIsNotCurrent),
		dryRun:               c.Bool(FlagDryRun),
		resetType:            resetType,
		decisionOffset:       decisionOffset,
		skipSignalReapply:    c.Bool(FlagSkipSignalReapply),
	}

	if inFileName == "" && query == "" {
		ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
	}

	wg := &sync.WaitGroup{}

	wes := make(chan types.WorkflowExecution)
	done := make(chan bool)
	for i := 0; i < parallel; i++ {
		wg.Add(1)
		go processResets(c, domain, wes, done, wg, batchResetParams)
	}

	// read excluded workflowIDs
	excludeWIDs := map[string]bool{}
	if excludeFileName != "" {
		excludeWIDs = loadWorkflowIDsFromFile(excludeFileName, separator)
	}
	if excludeQuery != "" {
		excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
	}

	fmt.Println("num of excluded WorkflowIDs:", len(excludeWIDs))

	if len(inFileName) > 0 {
		inFile, err := os.Open(inFileName)
		if err != nil {
			ErrorAndExit("Open failed", err)
		}
		defer inFile.Close()
		scanner := bufio.NewScanner(inFile)
		idx := 0
		for scanner.Scan() {
			idx++
			line := strings.TrimSpace(scanner.Text())
			if len(line) == 0 {
				fmt.Printf("line %v is empty, skipped\n", idx)
				continue
			}
			cols := strings.Split(line, separator)
			if len(cols) < 1 {
				ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
			}
			fmt.Printf("Start processing line %v ...\n", idx)
			wid := strings.TrimSpace(cols[0])
			rid := ""
			if len(cols) > 1 {
				rid = strings.TrimSpace(cols[1])
			}

			if excludeWIDs[wid] {
				fmt.Println("skip by exclude file: ", wid, rid)
				continue
			}

			wes <- types.WorkflowExecution{
				WorkflowID: wid,
				RunID:      rid,
			}
		}
	} else {
		wfClient := getWorkflowClient(c)
		pageSize := 1000
		var nextPageToken []byte
		var result []*types.WorkflowExecutionInfo
		for {
			result, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
			for _, we := range result {
				wid := we.Execution.GetWorkflowID()
				rid := we.Execution.GetRunID()
				if excludeWIDs[wid] {
					fmt.Println("skip by exclude file: ", wid, rid)
					continue
				}

				wes <- types.WorkflowExecution{
					WorkflowID: wid,
					RunID:      rid,
				}
			}

			if nextPageToken == nil {
				break
			}
		}
	}

	close(done)
	fmt.Println("wait for all goroutines...")
	wg.Wait()
}