collector/docsyncer/doc_executor.go (205 lines of code) (raw):

package docsyncer import ( "errors" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "sync/atomic" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" "sync" LOG "github.com/vinllen/log4go" ) var ( GlobalCollExecutorId int32 = -1 GlobalDocExecutorId int32 = -1 ) type CollectionExecutor struct { // multi executor executors []*DocExecutor // worker id id int // mongo url mongoUrl string sslRootFile string ns utils.NS wg sync.WaitGroup // batchCount int64 conn *utils.MongoCommunityConn docBatch chan []*bson.Raw // not own syncer *DBSyncer } func GenerateCollExecutorId() int { return int(atomic.AddInt32(&GlobalCollExecutorId, 1)) } func NewCollectionExecutor(id int, mongoUrl string, ns utils.NS, syncer *DBSyncer, sslRootFile string) *CollectionExecutor { return &CollectionExecutor{ id: id, mongoUrl: mongoUrl, sslRootFile: sslRootFile, ns: ns, syncer: syncer, // batchCount: 0, } } func (colExecutor *CollectionExecutor) Start() error { var err error if !conf.Options.FullSyncExecutorDebug { writeConcern := utils.ReadWriteConcernDefault if conf.Options.FullSyncExecutorMajorityEnable { writeConcern = utils.ReadWriteConcernMajority } if colExecutor.conn, err = utils.NewMongoCommunityConn(colExecutor.mongoUrl, utils.VarMongoConnectModePrimary, true, utils.ReadWriteConcernDefault, writeConcern, colExecutor.sslRootFile); err != nil { return err } } parallel := conf.Options.FullSyncReaderWriteDocumentParallel colExecutor.docBatch = make(chan []*bson.Raw, parallel) executors := make([]*DocExecutor, parallel) for i := 0; i != len(executors); i++ { // Client is a handle representing a pool of connections, can be use by multi routines // You Can get one idle connection, if all is idle, then always get the same one // connections pool default parameter(min_conn:0 max_conn:100 create_conn_once:2) executors[i] = NewDocExecutor(GenerateDocExecutorId(), colExecutor, colExecutor.conn, colExecutor.syncer) go executors[i].start() } colExecutor.executors = executors return nil } func (colExecutor *CollectionExecutor) Sync(docs []*bson.Raw) { count := uint64(len(docs)) if count == 0 { return } /* * TODO, waitGroup.Add may overflow, so use atomic to replace waitGroup * // colExecutor.wg.Add(1) */ colExecutor.wg.Add(1) // atomic.AddInt64(&colExecutor.batchCount, 1) colExecutor.docBatch <- docs } func (colExecutor *CollectionExecutor) Wait() error { colExecutor.wg.Wait() /*for v := atomic.LoadInt64(&colExecutor.batchCount); v != 0; { utils.YieldInMs(1000) LOG.Info("CollectionExecutor[%v %v] wait batchCount[%v] == 0", colExecutor.ns, colExecutor.id, v) }*/ close(colExecutor.docBatch) if !conf.Options.FullSyncExecutorDebug { colExecutor.conn.Close() } for _, exec := range colExecutor.executors { if exec.error != nil { return errors.New(fmt.Sprintf("sync ns %v failed. %v", colExecutor.ns, exec.error)) } } return nil } type DocExecutor struct { // sequence index id in each replayer id int // colExecutor, not owned colExecutor *CollectionExecutor conn *utils.MongoCommunityConn error error // not own syncer *DBSyncer } func GenerateDocExecutorId() int { return int(atomic.AddInt32(&GlobalDocExecutorId, 1)) } func NewDocExecutor(id int, colExecutor *CollectionExecutor, conn *utils.MongoCommunityConn, syncer *DBSyncer) *DocExecutor { return &DocExecutor{ id: id, colExecutor: colExecutor, conn: conn, syncer: syncer, } } func (exec *DocExecutor) String() string { return fmt.Sprintf("DocExecutor[%v] collectionExecutor[%v]", exec.id, exec.colExecutor.ns) } func (exec *DocExecutor) start() { if !conf.Options.FullSyncExecutorDebug { defer exec.conn.Close() } for { docs, ok := <-exec.colExecutor.docBatch if !ok { break } if exec.error == nil { if err := exec.doSync(docs); err != nil { exec.error = err // since v2.4.11: panic directly if meets error LOG.Crashf("%s sync failed: %v", exec, err) } } exec.colExecutor.wg.Done() // atomic.AddInt64(&exec.colExecutor.batchCount, -1) } } // use by full sync func (exec *DocExecutor) doSync(docs []*bson.Raw) error { if len(docs) == 0 || conf.Options.FullSyncExecutorDebug { return nil } ns := exec.colExecutor.ns var models []mongo.WriteModel for _, doc := range docs { if conf.Options.FullSyncExecutorFilterOrphanDocument && exec.syncer.orphanFilter != nil { var docData bson.D if err := bson.Unmarshal(*doc, &docData); err != nil { LOG.Error("doSync do bson unmarshal %v failed. %v", doc, err) } // judge whether is orphan document, pass if so if exec.syncer.orphanFilter.Filter(docData, ns.Database+"."+ns.Collection) { LOG.Info("orphan document [%v] filter", doc) continue } } models = append(models, mongo.NewInsertOneModel().SetDocument(doc)) } // qps limit if enable if exec.syncer.qos.Limit > 0 { exec.syncer.qos.FetchBucket() } if conf.Options.LogLevel == utils.VarLogLevelDebug { var docBeg, docEnd bson.M bson.Unmarshal(*docs[0], &docBeg) bson.Unmarshal(*docs[len(docs)-1], &docEnd) LOG.Debug("DBSyncer id[%v] doSync BulkWrite with table[%v] batch _id interval [%v, %v]", exec.syncer.id, ns, docBeg, docEnd) } opts := options.BulkWrite().SetOrdered(false) res, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, models, opts) if err != nil { if _, ok := err.(mongo.BulkWriteException); !ok { return fmt.Errorf("bulk run failed[%v]", err) } LOG.Warn("insert docs with length[%v] into ns[%v] of dest mongo failed[%v] res[%v]", len(models), ns, (err.(mongo.BulkWriteException)).WriteErrors[0], res) var updateModels []mongo.WriteModel for _, wError := range (err.(mongo.BulkWriteException)).WriteErrors { if utils.DuplicateKey(wError) { if !conf.Options.FullSyncExecutorInsertOnDupUpdate { return fmt.Errorf("duplicate key error[%v], you can clean the document on the target mongodb, "+ "or enable %v to solve, but full-sync stage needs restart", wError, "full_sync.executor.insert_on_dup_update") } dupDocument := *docs[wError.Index] var updateFilter bson.D updateFilterBool := false var docData bson.D if err := bson.Unmarshal(dupDocument, &docData); err == nil { for _, bsonE := range docData { if bsonE.Key == "_id" { updateFilter = bson.D{bsonE} updateFilterBool = true } } } if updateFilterBool == false { return fmt.Errorf("duplicate key error[%v], can't get _id from document", wError) } updateModels = append(updateModels, mongo.NewUpdateOneModel(). SetFilter(updateFilter).SetUpdate(bson.D{{"$set", dupDocument}})) } else { return fmt.Errorf("bulk run failed[%v]", wError) } } if len(updateModels) != 0 { opts := options.BulkWrite().SetOrdered(false) _, err := exec.conn.Client.Database(ns.Database).Collection(ns.Collection).BulkWrite(nil, updateModels, opts) if err != nil { return fmt.Errorf("bulk run updateForInsert failed[%v]", err) } LOG.Debug("updateForInsert succ updateModels.len:%d updateModules[0]:%v\n", len(updateModels), updateModels[0]) } else { return fmt.Errorf("bulk run failed[%v]", err) } } return nil }