nimo-shake/common/mongodb_mgo.go (128 lines of code) (raw):

package utils import ( "time" "fmt" "reflect" "github.com/vinllen/mgo" "github.com/vinllen/mgo/bson" LOG "github.com/vinllen/log4go" "github.com/jinzhu/copier" ) const ( ConnectModePrimary = "primary" ConnectModeSecondaryPreferred = "secondaryPreferred" ConnectModeStandalone = "standalone" OplogNS = "oplog.rs" ) var ( NotFountErr = "not found" NsNotFountErr = "ns not found" ) type NS struct { Database string Collection string } func (ns NS) Str() string { return fmt.Sprintf("%s.%s", ns.Database, ns.Collection) } type MongoConn struct { Session *mgo.Session URL string } func NewMongoConn(url string, connectMode string, timeout bool) (*MongoConn, error) { if connectMode == ConnectModeStandalone { url += "?connect=direct" } session, err := mgo.Dial(url) if err != nil { LOG.Critical("Connect to [%s] failed. %v", url, err) return nil, err } // maximum pooled connections. the overall established sockets // should be lower than this value(will block otherwise) session.SetPoolLimit(256) if timeout { session.SetSocketTimeout(10 * time.Minute) } else { session.SetSocketTimeout(0) } // already ping in the session /*if err := session.Ping(); err != nil { LOG.Critical("Verify ping command to %s failed. %v", url, err) return nil, err }*/ // Switch the session to a eventually behavior. In that case session // may read for any secondary node. default mode is mgo.Strong switch connectMode { case ConnectModePrimary: session.SetMode(mgo.Primary, true) case ConnectModeSecondaryPreferred: session.SetMode(mgo.SecondaryPreferred, true) case ConnectModeStandalone: session.SetMode(mgo.Monotonic, true) default: err = fmt.Errorf("unknown connect mode[%v]", connectMode) return nil, err } LOG.Info("New session to %s successfully", url) return &MongoConn{Session: session, URL: url}, nil } func (conn *MongoConn) Close() { LOG.Info("Close session with %s", conn.URL) conn.Session.Close() } func (conn *MongoConn) IsGood() bool { if err := conn.Session.Ping(); err != nil { return false } return true } func (conn *MongoConn) AcquireReplicaSetName() string { var replicaset struct { Id string `bson:"set"` } if err := conn.Session.DB("admin").Run(bson.M{"replSetGetStatus": 1}, &replicaset); err != nil { LOG.Warn("Replica set name not found in system.replset, %v", err) } return replicaset.Id } func (conn *MongoConn) HasOplogNs() bool { if ns, err := conn.Session.DB("local").CollectionNames(); err == nil { for _, table := range ns { if table == OplogNS { return true } } } return false } func (conn *MongoConn) HasUniqueIndex() bool { checkNs := make([]NS, 0, 128) var databases []string var err error if databases, err = conn.Session.DatabaseNames(); err != nil { LOG.Critical("Couldn't get databases from remote server %v", err) return false } for _, db := range databases { if db != "admin" && db != "local" { coll, _ := conn.Session.DB(db).CollectionNames() for _, c := range coll { if c != "system.profile" { // push all collections checkNs = append(checkNs, NS{Database: db, Collection: c}) } } } } for _, ns := range checkNs { indexes, _ := conn.Session.DB(ns.Database).C(ns.Collection).Indexes() for _, idx := range indexes { // has unique index if idx.Unique { LOG.Info("Found unique index %s on %s.%s in auto shard mode", idx.Name, ns.Database, ns.Collection) return true } } } return false } // first is from dynamo, second is from mongo func CompareBson(first, second bson.M) (bool, error) { v2 := make(bson.M, 0) if err := copier.Copy(&v2, &second); err != nil { return false, fmt.Errorf("copy[%v] failed[%v]", second, err) } if _, ok := v2["_id"]; ok { delete(v2, "_id") } return reflect.DeepEqual(first, v2), nil // return DeepEqual(first, v2), nil }