in nimo-full-check/checker/checker.go [29:80]
func (c *Checker) Run() error {
// fetch all tables
LOG.Info("start fetching table list")
rawTableList, err := shakeUtils.FetchTableList(c.dynamoSession)
if err != nil {
return fmt.Errorf("fetch table list failed[%v]", err)
}
LOG.Info("finish fetching table list: %v", rawTableList)
tableList := shakeFilter.FilterList(rawTableList)
LOG.Info("filter table list: %v", tableList)
// check table exist
if err := c.checkTableExist(tableList); err != nil {
return fmt.Errorf("check table exist failed[%v]", err)
}
// reset parallel if needed
parallel := conf.Opts.Parallel
if parallel > len(tableList) {
parallel = len(tableList)
}
execChan := make(chan string, len(tableList))
for _, table := range tableList {
execChan <- table
}
var wg sync.WaitGroup
wg.Add(len(tableList))
for i := 0; i < parallel; i++ {
go func(id int) {
for {
table, ok := <-execChan
if !ok {
break
}
LOG.Info("documentChecker[%v] starts checking table[%v]", id, table)
dc := NewDocumentChecker(id, table, c.dynamoSession)
dc.Run()
LOG.Info("documentChecker[%v] finishes checking table[%v]", id, table)
wg.Done()
}
}(i)
}
wg.Wait()
LOG.Info("all documentCheckers finish")
return nil
}