in nimo-shake/full-sync/syncer.go [92:198]
func checkTableExists(tableList []string, w writer.Writer) error {
LOG.Info("target.db.exist is set[%v]", conf.Options.TargetDBExist)
switch conf.Options.TargetType {
case utils.TargetTypeMongo:
sess := w.GetSession().(*mongo.Client)
now := time.Now().Format(utils.GolangSecurityTime)
collections, err := sess.Database(conf.Options.Id).ListCollectionNames(nil, bson2.M{})
if err != nil {
return fmt.Errorf("get target collection names error[%v]", err)
}
collectionsMp := utils.StringListToMap(collections)
for _, table := range tableList {
// check exist on the target mongodb
if _, ok := collectionsMp[table]; ok {
// exist
LOG.Info("table[%v] exists", table)
if conf.Options.TargetDBExist == utils.TargetDBExistDrop {
if err := sess.Database(conf.Options.Id).Collection(table).Drop(nil); err != nil {
return fmt.Errorf("drop target collection[%v] failed[%v]", table, err)
}
} else if conf.Options.TargetDBExist == utils.TargetDBExistRename {
fromCollection := fmt.Sprintf("%s.%s", conf.Options.Id, table)
toCollection := fmt.Sprintf("%s.%s_%v", conf.Options.Id, table, now)
res := sess.Database("admin").RunCommand(nil, bson2.D{
{"renameCollection", fromCollection},
{"to", toCollection},
{"dropTarget", false},
})
if err := res.Err(); err != nil {
return fmt.Errorf("rename target collection[%v] failed[%v]", table, err)
}
} else {
//return fmt.Errorf("collection[%v] exists on the target", table)
}
}
}
case utils.TargetTypeAliyunDynamoProxy:
sess := w.GetSession().(*dynamodb.DynamoDB)
// query table list
collections := make([]string, 0, 16)
// dynamo-proxy is not support Limit and ExclusiveStartTableName
/*lastTableName := aws.String("")
var count int64 = 100
for i := 0; ; i++ {
LOG.Debug("list table round[%v]", i)
var input *dynamodb.ListTablesInput
if i == 0 {
input = &dynamodb.ListTablesInput{
Limit: aws.Int64(count),
}
} else {
input = &dynamodb.ListTablesInput{
ExclusiveStartTableName: lastTableName,
Limit: aws.Int64(count),
}
}
out, err := sess.ListTables(input)
if err != nil {
return fmt.Errorf("list table failed: %v", err)
}
for _, collection := range out.TableNames {
collections = append(collections, *collection)
}
lastTableName = out.LastEvaluatedTableName
if len(out.TableNames) < int(count) {
break
}
}*/
out, err := sess.ListTables(&dynamodb.ListTablesInput{})
if err != nil {
return fmt.Errorf("list table failed: %v", err)
}
for _, collection := range out.TableNames {
collections = append(collections, *collection)
}
collectionsMp := utils.StringListToMap(collections)
LOG.Info("target exit db list: %v", collections)
for _, table := range tableList {
// check exist on the target
if _, ok := collectionsMp[table]; ok {
// exist
LOG.Info("table[%v] exists, try [%v]", table, conf.Options.TargetDBExist)
if conf.Options.TargetDBExist == utils.TargetDBExistDrop {
if _, err := sess.DeleteTable(&dynamodb.DeleteTableInput{
TableName: aws.String(table),
}); err != nil {
return fmt.Errorf("drop target collection[%v] failed[%v]", table, err)
}
} else {
return fmt.Errorf("collection[%v] exists on the target", table)
}
}
}
}
LOG.Info("finish handling table exists")
return nil
}