nimo-shake/full-sync/syncer.go (168 lines of code) (raw):

package full_sync import ( "fmt" "strings" "sync" utils "nimo-shake/common" conf "nimo-shake/configure" "nimo-shake/filter" "nimo-shake/writer" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" nimo "github.com/gugemichael/nimo4go" LOG "github.com/vinllen/log4go" bson2 "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) var ( metricNsMapLock sync.Mutex metricNsMap = make(map[string]*utils.CollectionMetric) // namespace map: db.collection -> collection metric ) func Start(dynamoSession *dynamodb.DynamoDB, w writer.Writer) { // fetch all tables LOG.Info("start fetching table list") tableList, err := utils.FetchTableList(dynamoSession) if err != nil { LOG.Crashf("fetch table list failed[%v]", err) } LOG.Info("finish fetching table list: %v", tableList) tableList = filter.FilterList(tableList) if err := checkTableExists(tableList, w); err != nil { if !strings.Contains(err.Error(), "ResourceNotFoundException") { LOG.Crashf("check table exists failed[%v]", err) return } } LOG.Info("start syncing: %v", tableList) metricNsMapLock.Lock() for _, table := range tableList { metricNsMap[table] = utils.NewCollectionMetric() } metricNsMapLock.Unlock() fullChan := make(chan string, len(tableList)) for _, table := range tableList { fullChan <- table } var wg sync.WaitGroup wg.Add(len(tableList)) for i := 0; i < int(conf.Options.FullConcurrency); i++ { go func(id int) { for { table, ok := <-fullChan if !ok { // chan closed break } // no need to lock map because the map size won't change ts := NewTableSyncer(id, table, metricNsMap[table]) if ts == nil { LOG.Crashf("tableSyncer[%v] create failed", id) } LOG.Info("tableSyncer[%v] starts sync table[%v]", id, table) ts.Sync() LOG.Info("tableSyncer[%v] finish sync table[%v]", id, table) ts.Close() wg.Done() } }(i) } wg.Wait() close(fullChan) LOG.Info("finish syncing all tables and indexes!") } func checkTableExists(tableList []string, w writer.Writer) error { LOG.Info("target.db.exist is set[%v]", conf.Options.TargetDBExist) switch conf.Options.TargetType { case utils.TargetTypeMongo: sess := w.GetSession().(*mongo.Client) now := time.Now().Format(utils.GolangSecurityTime) collections, err := sess.Database(conf.Options.Id).ListCollectionNames(nil, bson2.M{}) if err != nil { return fmt.Errorf("get target collection names error[%v]", err) } collectionsMp := utils.StringListToMap(collections) for _, table := range tableList { // check exist on the target mongodb if _, ok := collectionsMp[table]; ok { // exist LOG.Info("table[%v] exists", table) if conf.Options.TargetDBExist == utils.TargetDBExistDrop { if err := sess.Database(conf.Options.Id).Collection(table).Drop(nil); err != nil { return fmt.Errorf("drop target collection[%v] failed[%v]", table, err) } } else if conf.Options.TargetDBExist == utils.TargetDBExistRename { fromCollection := fmt.Sprintf("%s.%s", conf.Options.Id, table) toCollection := fmt.Sprintf("%s.%s_%v", conf.Options.Id, table, now) res := sess.Database("admin").RunCommand(nil, bson2.D{ {"renameCollection", fromCollection}, {"to", toCollection}, {"dropTarget", false}, }) if err := res.Err(); err != nil { return fmt.Errorf("rename target collection[%v] failed[%v]", table, err) } } else { //return fmt.Errorf("collection[%v] exists on the target", table) } } } case utils.TargetTypeAliyunDynamoProxy: sess := w.GetSession().(*dynamodb.DynamoDB) // query table list collections := make([]string, 0, 16) // dynamo-proxy is not support Limit and ExclusiveStartTableName /*lastTableName := aws.String("") var count int64 = 100 for i := 0; ; i++ { LOG.Debug("list table round[%v]", i) var input *dynamodb.ListTablesInput if i == 0 { input = &dynamodb.ListTablesInput{ Limit: aws.Int64(count), } } else { input = &dynamodb.ListTablesInput{ ExclusiveStartTableName: lastTableName, Limit: aws.Int64(count), } } out, err := sess.ListTables(input) if err != nil { return fmt.Errorf("list table failed: %v", err) } for _, collection := range out.TableNames { collections = append(collections, *collection) } lastTableName = out.LastEvaluatedTableName if len(out.TableNames) < int(count) { break } }*/ out, err := sess.ListTables(&dynamodb.ListTablesInput{}) if err != nil { return fmt.Errorf("list table failed: %v", err) } for _, collection := range out.TableNames { collections = append(collections, *collection) } collectionsMp := utils.StringListToMap(collections) LOG.Info("target exit db list: %v", collections) for _, table := range tableList { // check exist on the target if _, ok := collectionsMp[table]; ok { // exist LOG.Info("table[%v] exists, try [%v]", table, conf.Options.TargetDBExist) if conf.Options.TargetDBExist == utils.TargetDBExistDrop { if _, err := sess.DeleteTable(&dynamodb.DeleteTableInput{ TableName: aws.String(table), }); err != nil { return fmt.Errorf("drop target collection[%v] failed[%v]", table, err) } } else { return fmt.Errorf("collection[%v] exists on the target", table) } } } } LOG.Info("finish handling table exists") return nil } func RestAPI() { type FullSyncInfo struct { Progress string `json:"progress"` // synced_collection_number / total_collection_number TotalCollection int `json:"total_collection_number"` // total collection FinishedCollection int `json:"finished_collection_number"` // finished ProcessingCollection int `json:"processing_collection_number"` // in processing WaitCollection int `json:"wait_collection_number"` // wait start CollectionMetric map[string]string `json:"collection_metric"` // collection_name -> process } utils.FullSyncHttpApi.RegisterAPI("/progress", nimo.HttpGet, func([]byte) interface{} { ret := FullSyncInfo{ CollectionMetric: make(map[string]string), } metricNsMapLock.Lock() defer metricNsMapLock.Unlock() ret.TotalCollection = len(metricNsMap) for ns, collectionMetric := range metricNsMap { ret.CollectionMetric[ns] = collectionMetric.String() switch collectionMetric.CollectionStatus { case utils.StatusWaitStart: ret.WaitCollection += 1 case utils.StatusProcessing: ret.ProcessingCollection += 1 case utils.StatusFinish: ret.FinishedCollection += 1 } } if ret.TotalCollection == 0 { ret.Progress = "-%" } else { ret.Progress = fmt.Sprintf("%.2f%%", float64(ret.FinishedCollection)/float64(ret.TotalCollection)*100) } return ret }) }