in tools/cli/workflow_commands.go [1509:1640]
func ResetInBatch(c *cli.Context) {
domain := getRequiredGlobalOption(c, FlagDomain)
resetType := getRequiredOption(c, FlagResetType)
decisionOffset := c.Int(FlagDecisionOffset)
if decisionOffset > 0 {
ErrorAndExit("Only decision offset <=0 is supported", nil)
}
inFileName := c.String(FlagInputFile)
query := c.String(FlagListQuery)
excludeFileName := c.String(FlagExcludeFile)
excludeQuery := c.String(FlagExcludeWorkflowIDByQuery)
separator := c.String(FlagInputSeparator)
parallel := c.Int(FlagParallismDeprecated)
if parallel == 1 {
parallel = c.Int(FlagParallelism)
}
extraForResetType, ok := resetTypesMap[resetType]
if !ok {
ErrorAndExit("Not supported reset type", nil)
} else if len(extraForResetType) > 0 {
getRequiredOption(c, extraForResetType)
}
if excludeFileName != "" && excludeQuery != "" {
ErrorAndExit("Only one of the excluding option is allowed", nil)
}
batchResetParams := batchResetParamsType{
reason: getRequiredOption(c, FlagReason),
skipCurrentOpen: c.Bool(FlagSkipCurrentOpen),
skipCurrentCompleted: c.Bool(FlagSkipCurrentCompleted),
nonDeterministicOnly: c.Bool(FlagNonDeterministicOnly),
skipBaseNotCurrent: c.Bool(FlagSkipBaseIsNotCurrent),
dryRun: c.Bool(FlagDryRun),
resetType: resetType,
decisionOffset: decisionOffset,
skipSignalReapply: c.Bool(FlagSkipSignalReapply),
}
if inFileName == "" && query == "" {
ErrorAndExit("Must provide input file or list query to get target workflows to reset", nil)
}
wg := &sync.WaitGroup{}
wes := make(chan types.WorkflowExecution)
done := make(chan bool)
for i := 0; i < parallel; i++ {
wg.Add(1)
go processResets(c, domain, wes, done, wg, batchResetParams)
}
// read excluded workflowIDs
excludeWIDs := map[string]bool{}
if excludeFileName != "" {
excludeWIDs = loadWorkflowIDsFromFile(excludeFileName, separator)
}
if excludeQuery != "" {
excludeWIDs = getAllWorkflowIDsByQuery(c, excludeQuery)
}
fmt.Println("num of excluded WorkflowIDs:", len(excludeWIDs))
if len(inFileName) > 0 {
inFile, err := os.Open(inFileName)
if err != nil {
ErrorAndExit("Open failed", err)
}
defer inFile.Close()
scanner := bufio.NewScanner(inFile)
idx := 0
for scanner.Scan() {
idx++
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
fmt.Printf("line %v is empty, skipped\n", idx)
continue
}
cols := strings.Split(line, separator)
if len(cols) < 1 {
ErrorAndExit("Split failed", fmt.Errorf("line %v has less than 1 cols separated by comma, only %v ", idx, len(cols)))
}
fmt.Printf("Start processing line %v ...\n", idx)
wid := strings.TrimSpace(cols[0])
rid := ""
if len(cols) > 1 {
rid = strings.TrimSpace(cols[1])
}
if excludeWIDs[wid] {
fmt.Println("skip by exclude file: ", wid, rid)
continue
}
wes <- types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}
}
} else {
wfClient := getWorkflowClient(c)
pageSize := 1000
var nextPageToken []byte
var result []*types.WorkflowExecutionInfo
for {
result, nextPageToken = scanWorkflowExecutions(wfClient, pageSize, nextPageToken, query, c)
for _, we := range result {
wid := we.Execution.GetWorkflowID()
rid := we.Execution.GetRunID()
if excludeWIDs[wid] {
fmt.Println("skip by exclude file: ", wid, rid)
continue
}
wes <- types.WorkflowExecution{
WorkflowID: wid,
RunID: rid,
}
}
if nextPageToken == nil {
break
}
}
}
close(done)
fmt.Println("wait for all goroutines...")
wg.Wait()
}