collector/coordinator/extra_job.go (102 lines of code) (raw):

package coordinator import ( "context" "fmt" "strings" "sync" "time" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" LOG "github.com/vinllen/log4go" ) const ( NameCheckUniqueIndexExistsJob = "CheckIndexExistsJob" ) var ( extraJobList = make(map[int][]extraJob) lock sync.Mutex ) type extraJob interface { Name() string Run() } func AddExtraJob(name string, interval int, input ...interface{}) { LOG.Info("start run extra job[%v] with interval[%v]", name, interval) lock.Lock() defer lock.Unlock() switch name { case NameCheckUniqueIndexExistsJob: collections := input[0].([]string) urls := input[1].([]*utils.MongoSource) extraJobList[interval] = append(extraJobList[interval], NewCheckUniqueIndexExistsJob(interval, collections, urls)) } } func RunExtraJob(RealSourceIncrSync []*utils.MongoSource) error { if len(conf.Options.IncrSyncShardByObjectIdWhiteList) != 0 { AddExtraJob(NameCheckUniqueIndexExistsJob, 10, conf.Options.IncrSyncShardByObjectIdWhiteList, RealSourceIncrSync) } for _, jobList := range extraJobList { for _, job := range jobList { go job.Run() } } return nil } type CheckUniqueIndexExistsJob struct { interval int collections []string urls []*utils.MongoSource } func NewCheckUniqueIndexExistsJob(interval int, collections []string, urls []*utils.MongoSource) *CheckUniqueIndexExistsJob { return &CheckUniqueIndexExistsJob{ interval: interval, collections: collections, urls: urls, } } func (cui *CheckUniqueIndexExistsJob) Name() string { return NameCheckUniqueIndexExistsJob } func (cui *CheckUniqueIndexExistsJob) innerRun() error { var err error conns := make([]*utils.MongoCommunityConn, len(cui.urls)) for i, source := range cui.urls { conns[i], err = utils.NewMongoCommunityConn(source.URL, utils.VarMongoConnectModeSecondaryPreferred, true, utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile) if err != nil { LOG.Error("extra job[%s] connect source[%v] failed: %v", cui.Name(), source.URL, err) return nil } } // parse collection to ns nsList := make([]utils.NS, 0, len(cui.collections)) for _, c := range cui.collections { nsList = append(nsList, utils.NewNS(c)) } for range time.NewTicker(time.Duration(cui.interval) * time.Second).C { LOG.Debug("extra job[%s] check", cui.Name()) for i, source := range cui.urls { for _, ns := range nsList { LOG.Debug("extra job[%s] check[%v]", cui.Name(), ns) cursor, _ := conns[i].Client.Database(ns.Database).Collection(ns.Collection).Indexes().List(nil) for cursor.Next(context.Background()) { name, nErr := cursor.Current.LookupErr("name") unique, uErr := cursor.Current.LookupErr("unique") if uErr == nil && nErr == nil && !strings.HasPrefix(name.String(), "_id") && unique.Boolean() == true { return fmt.Errorf("extra job[%s] with source[%v] query "+ "collection[%s - %s] find unique[%v]", cui.Name(), source.URL, ns.Database, ns.Collection, cursor.Current) } } } } } return nil } func (cui *CheckUniqueIndexExistsJob) Run() { var err error err = cui.innerRun() if err != nil { LOG.Crashf("%v", err) } }