in collector/docsyncer/doc_syncer.go [101:204]
func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoCommunityConn,
nsTrans *transform.NamespaceTransform) error {
LOG.Info("document syncer namespace spec for sharding begin")
var fromConn *utils.MongoCommunityConn
var err error
if fromConn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault,
conf.Options.MongoSslRootCaFile); err != nil {
LOG.Info("Connect to [%s] failed. err[%v]", csUrl, err)
return err
}
defer fromConn.Close()
filterList := filter.NewDocFilterList()
dbTrans := transform.NewDBTransform(conf.Options.TransformNamespace)
type dbSpec struct {
Db string `bson:"_id"`
Partitioned bool `bson:"partitioned"`
}
var dbSpecDoc dbSpec
var docCursor *mongo.Cursor
// enable sharding for db
docCursor, err = fromConn.Client.Database("config").Collection("databases").Find(nil, bson.M{})
if err != nil {
return err
}
for docCursor.Next(nil) {
err = bson.Unmarshal(docCursor.Current, &dbSpecDoc)
if err != nil {
LOG.Error("parse docCursor.Current[%v] failed", docCursor.Current)
continue
}
if dbSpecDoc.Partitioned {
if filterList.IterateFilter(dbSpecDoc.Db + ".$cmd") {
LOG.Debug("DB is filtered. %v", dbSpecDoc.Db)
continue
}
var todbSpecDoc dbSpec
todbList := dbTrans.Transform(dbSpecDoc.Db)
for _, todb := range todbList {
err = toConn.Client.Database("config").Collection("databases").FindOne(nil,
bson.D{{"_id", todb}}).Decode(&todbSpecDoc)
if err == nil && todbSpecDoc.Partitioned {
continue
}
err = toConn.Client.Database("admin").RunCommand(nil,
bson.D{{"enablesharding", todb}}).Err()
if err != nil {
LOG.Critical("Enable sharding for db %v of dest mongodb failed. %v", todb, err)
return errors.New(fmt.Sprintf("Enable sharding for db %v of dest mongodb failed. %v",
todb, err))
}
LOG.Info("Enable sharding for db %v of dest mongodb successful", todb)
}
}
}
if err := docCursor.Close(nil); err != nil {
LOG.Critical("Close iterator of config.database failed. %v", err)
}
type colSpec struct {
Ns string `bson:"_id"`
Key *bson.Raw `bson:"key"`
Unique bool `bson:"unique"`
Dropped bool `bson:"dropped"`
}
var colSpecDoc colSpec
var colDocCursor *mongo.Cursor
// enable sharding for db(shardCollection)
colDocCursor, err = fromConn.Client.Database("config").Collection(
"collections").Find(nil, bson.D{})
for colDocCursor.Next(nil) {
err = bson.Unmarshal(colDocCursor.Current, &colSpecDoc)
if err != nil {
LOG.Error("parse colDocCursor.Current[%v] failed", colDocCursor.Current)
continue
}
if !colSpecDoc.Dropped {
if filterList.IterateFilter(colSpecDoc.Ns) {
LOG.Debug("Namespace is filtered. %v", colSpecDoc.Ns)
continue
}
toNs := nsTrans.Transform(colSpecDoc.Ns)
err = toConn.Client.Database("admin").RunCommand(nil, bson.D{{"shardCollection", toNs},
{"key", colSpecDoc.Key}, {"unique", colSpecDoc.Unique}}).Err()
if err != nil && !in(toNs, conf.Options.SkipNSShareKeyVerify) {
LOG.Critical("Shard collection for ns %v of dest mongodb failed. %v", toNs, err)
return errors.New(fmt.Sprintf("Shard collection for ns %v of dest mongodb failed. %v",
toNs, err))
}
LOG.Info("Shard collection for ns %v of dest mongodb successful", toNs)
}
}
if err = docCursor.Close(nil); err != nil {
LOG.Critical("Close iterator of config.collections failed. %v", err)
}
LOG.Info("document syncer namespace spec for sharding successful")
return nil
}