nimo-shake/writer/mongodb_mgo_driver.go (291 lines of code) (raw):

package writer import ( "nimo-shake/common" "fmt" LOG "github.com/vinllen/log4go" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/vinllen/mgo" "github.com/vinllen/mgo/bson" "nimo-shake/configure" "strings" ) // deprecated type MongoWriter struct { Name string ns utils.NS conn *utils.MongoConn primaryIndexes []*dynamodb.KeySchemaElement } func NewMongoWriter(name, address string, ns utils.NS) *MongoWriter { targetConn, err := utils.NewMongoConn(address, utils.ConnectModePrimary, true) if err != nil { LOG.Error("create mongodb connection error[%v]", err) return nil } return &MongoWriter{ Name: name, ns: ns, conn: targetConn, } } func (mw *MongoWriter) String() string { return mw.Name } func (mw *MongoWriter) GetSession() interface{} { return mw.conn.Session } func (mw *MongoWriter) PassTableDesc(tableDescribe *dynamodb.TableDescription) { mw.primaryIndexes = tableDescribe.KeySchema } func (mw *MongoWriter) CreateTable(tableDescribe *dynamodb.TableDescription) error { // parse primary key with sort key allIndexes := tableDescribe.AttributeDefinitions primaryIndexes := tableDescribe.KeySchema globalSecondaryIndexes := tableDescribe.GlobalSecondaryIndexes mw.primaryIndexes = primaryIndexes LOG.Info("%s table[%s] primary index length: %v", mw.String(), *tableDescribe.TableName, len(mw.primaryIndexes)) // parse index type parseMap := utils.ParseIndexType(allIndexes) // create primary key if has if len(primaryIndexes) == 0 { LOG.Info("%s no index found", mw) return nil } // check if legal if len(primaryIndexes) > 2 { return fmt.Errorf("%s illegal primary index[%v] number, should <= 2", mw, len(primaryIndexes)) } if conf.Options.FullEnableIndexPrimary { LOG.Info("%s try create primary index", mw) // create primary index if err := mw.createPrimaryIndex(primaryIndexes, parseMap); err != nil { return err } // create user index if conf.Options.FullEnableIndexUser { LOG.Info("%s try create user index", mw) // create user index if err := mw.createUserIndex(globalSecondaryIndexes, parseMap); err != nil { return err } } } return nil } func (mw *MongoWriter) DropTable() error { err := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).DropCollection() if err != nil && err.Error() == utils.NsNotFountErr { return nil } return err } func (mw *MongoWriter) WriteBulk(input []interface{}) error { if len(input) == 0 { return nil } bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk() bulk.Unordered() bulk.Insert(input...) if _, err := bulk.Run(); err != nil { if mgo.IsDup(err) { LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mw) if !conf.Options.FullExecutorInsertOnDupUpdate || len(mw.primaryIndexes) == 0 { LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate, len(mw.primaryIndexes)) return err } // 1. generate index list indexList := make([]interface{}, len(input)) for i, ele := range input { inputData := ele.(bson.M) index := make(bson.M, len(mw.primaryIndexes)) for _, primaryIndex := range mw.primaryIndexes { // currently, we only support convert type == 'convert', so there is no type inside key := *primaryIndex.AttributeName if _, ok := inputData[key]; !ok { LOG.Error("primary key[%v] is not exists on input data[%v]", *primaryIndex.AttributeName, inputData) } else { index[key] = inputData[key] } } indexList[i] = index } LOG.Debug(indexList) return mw.updateOnInsert(input, indexList) } return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v", mw, len(input), mw.ns, err, input[0]) } return nil } func (mw *MongoWriter) Close() { mw.conn.Close() } func (mw *MongoWriter) Insert(input []interface{}, index []interface{}) error { bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk() bulk.Unordered() bulk.Insert(input...) if _, err := bulk.Run(); err != nil { if utils.MongodbIgnoreError(err, "i", false) { LOG.Warn("%s ignore error[%v] when insert", mw, err) return nil } // duplicate key if mgo.IsDup(err) { if conf.Options.IncreaseExecutorInsertOnDupUpdate { LOG.Warn("%s duplicated document found. reinsert or update", mw) return mw.updateOnInsert(input, index) } } return err } return nil } func (mw *MongoWriter) updateOnInsert(input []interface{}, index []interface{}) error { // upsert one by one for i := range input { LOG.Debug("upsert: selector[%v] update[%v]", index[i], input[i]) _, err := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Upsert(index[i], input[i]) if err != nil { if utils.MongodbIgnoreError(err, "u", true) { LOG.Warn("%s ignore error[%v] when upsert", mw, err) return nil } return err } } return nil } func (mw *MongoWriter) Delete(index []interface{}) error { bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk() bulk.Unordered() bulk.Remove(index...) if _, err := bulk.Run(); err != nil { LOG.Warn(err) // always ignore ns not found error if utils.MongodbIgnoreError(err, "d", true) { LOG.Warn("%s ignore error[%v] when delete", mw, err) return nil } return err } return nil } func (mw *MongoWriter) Update(input []interface{}, index []interface{}) error { updates := make([]interface{}, 0, len(input)*2) for i := range input { updates = append(updates, index[i]) updates = append(updates, input[i]) } bulk := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Bulk() if conf.Options.IncreaseExecutorUpsert { bulk.Upsert(updates...) } else { bulk.Update(updates...) } if _, err := bulk.Run(); err != nil { LOG.Warn(err) // parse error idx, _, _ := utils.FindFirstErrorIndexAndMessage(err.Error()) if idx == -1 { return err } // always upsert data if utils.MongodbIgnoreError(err, "u", true) { return mw.updateOnInsert(input[idx:], index[idx:]) } if mgo.IsDup(err) { LOG.Info("error[%v] is dup, ignore", err) return mw.updateOnInsert(input[idx+1:], index[idx+1:]) } return err } return nil } func (mw *MongoWriter) createPrimaryIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string) error { primaryKeyWithType, err := mw.createSingleIndex(primaryIndexes, parseMap, true) if err != nil { return err } // write shard key if target mongodb is sharding if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeSharding { err := mw.conn.Session.DB("admin").Run(bson.D{ {Name: "enablesharding", Value: mw.ns.Database}, }, nil) if err != nil { if strings.Contains(err.Error(), "sharding already enabled") == false { return fmt.Errorf("enable sharding failed[%v]", err) } LOG.Warn("ns[%s] sharding already enabled: %v", mw.ns, err) } err = mw.conn.Session.DB("admin").Run(bson.D{ {Name: "shardCollection", Value: mw.ns.Str()}, {Name: "key", Value: bson.M{primaryKeyWithType: "hashed"}}, {Name: "options", Value: bson.M{"numInitialChunks": NumInitialChunks}}, }, nil) if err != nil { return fmt.Errorf("shard collection[%s] failed[%v]", mw.ns, err) } } return nil } func (mw *MongoWriter) createUserIndex(globalSecondaryIndexes []*dynamodb.GlobalSecondaryIndexDescription, parseMap map[string]string) error { for _, gsi := range globalSecondaryIndexes { primaryIndexes := gsi.KeySchema // duplicate index will be ignored by MongoDB if _, err := mw.createSingleIndex(primaryIndexes, parseMap, false); err != nil { LOG.Error("ns[%s] create users' single index failed[%v]", mw.ns, err) return err } } return nil } func (mw *MongoWriter) createSingleIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string, isPrimaryKey bool) (string, error) { primaryKey, sortKey, err := utils.ParsePrimaryAndSortKey(primaryIndexes, parseMap) if err != nil { return "", fmt.Errorf("parse primary and sort key failed[%v]", err) } primaryKeyWithType := mw.fetchKey(primaryKey, parseMap[primaryKey]) indexList := make([]string, 0, 2) indexList = append(indexList, primaryKeyWithType) if sortKey != "" { indexList = append(indexList, mw.fetchKey(sortKey, parseMap[sortKey])) } LOG.Info("ns[%s] single index[%v] list[%v]", mw.ns, primaryKeyWithType, indexList) // primary key should be unique unique := isPrimaryKey // create union unique index if len(indexList) >= 2 { // write index index := mgo.Index{ Key: indexList, Background: true, Unique: unique, } LOG.Info("create union-index isPrimary[%v]: %v", isPrimaryKey, index.Key) if err := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).EnsureIndex(index); err != nil { return "", fmt.Errorf("create primary union unique[%v] index failed[%v]", unique, err) } } var indexType interface{} indexType = "hashed" if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeReplica { indexType = 1 } if len(indexList) >= 2 { // unique has already be set on the above index unique = false } else if unique { // must be range if only has 1 key indexType = 1 } doc := bson.D{ {Name: "createIndexes", Value: mw.ns.Collection}, {Name: "indexes", Value: []bson.M{ { "key": bson.M{ primaryKeyWithType: indexType, }, "name": fmt.Sprintf("%s_%v", primaryKeyWithType, indexType), "background": true, "unique": unique, }, }}, } LOG.Info("create index isPrimary[%v]: %v", isPrimaryKey, doc) // create hash key only if err := mw.conn.Session.DB(mw.ns.Database).Run(doc, nil); err != nil { return "", fmt.Errorf("create primary[%v] %v index failed[%v]", isPrimaryKey, indexType, err) } return primaryKeyWithType, nil } func (mw *MongoWriter) fetchKey(key, tp string) string { switch conf.Options.ConvertType { case utils.ConvertTypeChange: fallthrough case utils.ConvertTypeSame: return key case utils.ConvertTypeRaw: return fmt.Sprintf("%s.%s", key, tp) } return "" }