nimo-full-check/checker/checker.go (81 lines of code) (raw):

package checker import ( "context" "fmt" "sync" conf "nimo-full-check/configure" shakeUtils "nimo-shake/common" shakeFilter "nimo-shake/filter" "github.com/aws/aws-sdk-go/service/dynamodb" LOG "github.com/vinllen/log4go" "go.mongodb.org/mongo-driver/bson" ) type Checker struct { dynamoSession *dynamodb.DynamoDB mongoClient *shakeUtils.MongoCommunityConn } func NewChecker(dynamoSession *dynamodb.DynamoDB, mongoClient *shakeUtils.MongoCommunityConn) *Checker { return &Checker{ dynamoSession: dynamoSession, mongoClient: mongoClient, } } func (c *Checker) Run() error { // fetch all tables LOG.Info("start fetching table list") rawTableList, err := shakeUtils.FetchTableList(c.dynamoSession) if err != nil { return fmt.Errorf("fetch table list failed[%v]", err) } LOG.Info("finish fetching table list: %v", rawTableList) tableList := shakeFilter.FilterList(rawTableList) LOG.Info("filter table list: %v", tableList) // check table exist if err := c.checkTableExist(tableList); err != nil { return fmt.Errorf("check table exist failed[%v]", err) } // reset parallel if needed parallel := conf.Opts.Parallel if parallel > len(tableList) { parallel = len(tableList) } execChan := make(chan string, len(tableList)) for _, table := range tableList { execChan <- table } var wg sync.WaitGroup wg.Add(len(tableList)) for i := 0; i < parallel; i++ { go func(id int) { for { table, ok := <-execChan if !ok { break } LOG.Info("documentChecker[%v] starts checking table[%v]", id, table) dc := NewDocumentChecker(id, table, c.dynamoSession) dc.Run() LOG.Info("documentChecker[%v] finishes checking table[%v]", id, table) wg.Done() } }(i) } wg.Wait() LOG.Info("all documentCheckers finish") return nil } func (c *Checker) checkTableExist(tableList []string) error { collections, err := c.mongoClient.Client.Database(conf.Opts.Id).ListCollectionNames(context.TODO(), bson.M{}) if err != nil { return fmt.Errorf("get target collection names error[%v]", err) } LOG.Info("all table: %v", collections) collectionsMp := shakeUtils.StringListToMap(collections) notExist := make([]string, 0) for _, table := range tableList { if _, ok := collectionsMp[table]; !ok { notExist = append(notExist, table) } } if len(notExist) != 0 { return fmt.Errorf("table not exist on the target side: %v", notExist) } return nil }