func StartNamespaceSpecSyncForSharding()

in collector/docsyncer/doc_syncer.go [101:204]


func StartNamespaceSpecSyncForSharding(csUrl string, toConn *utils.MongoCommunityConn,
	nsTrans *transform.NamespaceTransform) error {
	LOG.Info("document syncer namespace spec for sharding begin")

	var fromConn *utils.MongoCommunityConn
	var err error
	if fromConn, err = utils.NewMongoCommunityConn(csUrl, utils.VarMongoConnectModePrimary, true,
		utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault,
		conf.Options.MongoSslRootCaFile); err != nil {
		LOG.Info("Connect to [%s] failed. err[%v]", csUrl, err)
		return err
	}
	defer fromConn.Close()

	filterList := filter.NewDocFilterList()
	dbTrans := transform.NewDBTransform(conf.Options.TransformNamespace)

	type dbSpec struct {
		Db          string `bson:"_id"`
		Partitioned bool   `bson:"partitioned"`
	}
	var dbSpecDoc dbSpec
	var docCursor *mongo.Cursor
	// enable sharding for db
	docCursor, err = fromConn.Client.Database("config").Collection("databases").Find(nil, bson.M{})
	if err != nil {
		return err
	}
	for docCursor.Next(nil) {
		err = bson.Unmarshal(docCursor.Current, &dbSpecDoc)
		if err != nil {
			LOG.Error("parse docCursor.Current[%v] failed", docCursor.Current)
			continue
		}
		if dbSpecDoc.Partitioned {
			if filterList.IterateFilter(dbSpecDoc.Db + ".$cmd") {
				LOG.Debug("DB is filtered. %v", dbSpecDoc.Db)
				continue
			}
			var todbSpecDoc dbSpec
			todbList := dbTrans.Transform(dbSpecDoc.Db)
			for _, todb := range todbList {

				err = toConn.Client.Database("config").Collection("databases").FindOne(nil,
					bson.D{{"_id", todb}}).Decode(&todbSpecDoc)
				if err == nil && todbSpecDoc.Partitioned {
					continue
				}
				err = toConn.Client.Database("admin").RunCommand(nil,
					bson.D{{"enablesharding", todb}}).Err()
				if err != nil {
					LOG.Critical("Enable sharding for db %v of dest mongodb failed. %v", todb, err)
					return errors.New(fmt.Sprintf("Enable sharding for db %v of dest mongodb failed. %v",
						todb, err))
				}
				LOG.Info("Enable sharding for db %v of dest mongodb successful", todb)
			}
		}
	}
	if err := docCursor.Close(nil); err != nil {
		LOG.Critical("Close iterator of config.database failed. %v", err)
	}

	type colSpec struct {
		Ns      string    `bson:"_id"`
		Key     *bson.Raw `bson:"key"`
		Unique  bool      `bson:"unique"`
		Dropped bool      `bson:"dropped"`
	}
	var colSpecDoc colSpec
	var colDocCursor *mongo.Cursor
	// enable sharding for db(shardCollection)
	colDocCursor, err = fromConn.Client.Database("config").Collection(
		"collections").Find(nil, bson.D{})
	for colDocCursor.Next(nil) {
		err = bson.Unmarshal(colDocCursor.Current, &colSpecDoc)
		if err != nil {
			LOG.Error("parse colDocCursor.Current[%v] failed", colDocCursor.Current)
			continue
		}

		if !colSpecDoc.Dropped {
			if filterList.IterateFilter(colSpecDoc.Ns) {
				LOG.Debug("Namespace is filtered. %v", colSpecDoc.Ns)
				continue
			}
			toNs := nsTrans.Transform(colSpecDoc.Ns)
			err = toConn.Client.Database("admin").RunCommand(nil, bson.D{{"shardCollection", toNs},
				{"key", colSpecDoc.Key}, {"unique", colSpecDoc.Unique}}).Err()
			if err != nil && !in(toNs, conf.Options.SkipNSShareKeyVerify) {
				LOG.Critical("Shard collection for ns %v of dest mongodb failed. %v", toNs, err)
				return errors.New(fmt.Sprintf("Shard collection for ns %v of dest mongodb failed. %v",
					toNs, err))
			}
			LOG.Info("Shard collection for ns %v of dest mongodb successful", toNs)
		}
	}
	if err = docCursor.Close(nil); err != nil {
		LOG.Critical("Close iterator of config.collections failed. %v", err)
	}

	LOG.Info("document syncer namespace spec for sharding successful")
	return nil
}