nimo-shake/writer/mongodb_community_driver.go (309 lines of code) (raw):

package writer import ( "fmt" utils "nimo-shake/common" "reflect" "context" conf "nimo-shake/configure" "strings" "github.com/aws/aws-sdk-go/service/dynamodb" LOG "github.com/vinllen/log4go" bson2 "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( NumInitialChunks = 1024 ) type MongoCommunityWriter struct { Name string ns utils.NS conn *utils.MongoCommunityConn primaryIndexes []*dynamodb.KeySchemaElement ctx context.Context upsertOption *options.ReplaceOptions } func NewMongoCommunityWriter(name, address string, ns utils.NS) *MongoCommunityWriter { targetConn, err := utils.NewMongoCommunityConn(address, utils.ConnectModePrimary, true) if err != nil { LOG.Error("create mongodb community connection error[%v]", err) return nil } upsertOptions := new(options.ReplaceOptions) upsertOptions.SetUpsert(true) return &MongoCommunityWriter{ Name: name, ns: ns, conn: targetConn, ctx: context.Background(), // default upsertOption: upsertOptions, } } func (mcw *MongoCommunityWriter) String() string { return mcw.Name } func (mcw *MongoCommunityWriter) GetSession() interface{} { return mcw.conn.Client } func (mcw *MongoCommunityWriter) PassTableDesc(tableDescribe *dynamodb.TableDescription) { mcw.primaryIndexes = tableDescribe.KeySchema } func (mcw *MongoCommunityWriter) CreateTable(tableDescribe *dynamodb.TableDescription) error { // parse primary key with sort key allIndexes := tableDescribe.AttributeDefinitions primaryIndexes := tableDescribe.KeySchema globalSecondaryIndexes := tableDescribe.GlobalSecondaryIndexes mcw.primaryIndexes = primaryIndexes LOG.Info("%s table[%s] primary index length: %v", mcw.String(), *tableDescribe.TableName, len(mcw.primaryIndexes)) // parse index type parseMap := utils.ParseIndexType(allIndexes) // create primary key if has if len(primaryIndexes) == 0 { LOG.Info("%s no index found", mcw) return nil } // check if legal if len(primaryIndexes) > 2 { return fmt.Errorf("%s illegal primary index[%v] number, should <= 2", mcw, len(primaryIndexes)) } if conf.Options.FullEnableIndexPrimary { LOG.Info("%s try create primary index", mcw) // create primary index if err := mcw.createPrimaryIndex(primaryIndexes, parseMap); err != nil { return err } // create user index if conf.Options.FullEnableIndexUser { LOG.Info("%s try create user index", mcw) // create user index if err := mcw.createUserIndex(globalSecondaryIndexes, parseMap); err != nil { return err } } } return nil } func (mcw *MongoCommunityWriter) DropTable() error { // error will be depressed when ns not found err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).Drop(mcw.ctx) return err } func (mcw *MongoCommunityWriter) WriteBulk(input []interface{}) error { if len(input) == 0 { return nil } // convert input array to models list models := make([]mongo.WriteModel, len(input)) for i := range models { models[i] = &mongo.InsertOneModel{Document: input[i]} LOG.Debug("WriteBulk: %v", input[i]) } _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models) if err != nil { if strings.Contains(err.Error(), "duplicate key error") { LOG.Warn("%s duplicated document found[%v]. reinsert or update", err, mcw) if !conf.Options.FullExecutorInsertOnDupUpdate || len(mcw.primaryIndexes) == 0 { LOG.Error("full.executor.insert_on_dup_update==[%v], primaryIndexes length[%v]", conf.Options.FullExecutorInsertOnDupUpdate, len(mcw.primaryIndexes)) return err } // 1. generate index list indexList := make([]interface{}, len(input)) for i, ele := range input { inputData, ok := ele.(bson2.M) if !ok { inputData = ele.(map[string]interface{}) LOG.Debug("inputData type:%v content:%v", reflect.TypeOf(inputData), inputData) } index := make(bson2.M, len(mcw.primaryIndexes)) for _, primaryIndex := range mcw.primaryIndexes { // currently, we only support convert type == 'convert', so there is no type inside key := *primaryIndex.AttributeName if _, ok := inputData[key]; !ok { LOG.Error("primary key[%v] is not exists on input data[%v]", *primaryIndex.AttributeName, inputData) } else { index[key] = inputData[key] } } indexList[i] = index } LOG.Debug(indexList) return mcw.updateOnInsert(input, indexList) } return fmt.Errorf("%s insert docs with length[%v] into ns[%s] of dest mongo failed[%v]. first doc: %v", mcw, len(input), mcw.ns, err, input[0]) } return nil } func (mcw *MongoCommunityWriter) Close() { mcw.conn.Close() } func (mcw *MongoCommunityWriter) Insert(input []interface{}, index []interface{}) error { // convert input array to models list models := make([]mongo.WriteModel, len(input)) for i := range models { models[i] = &mongo.InsertOneModel{Document: input[i]} } _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models) if err != nil { if utils.MongodbIgnoreError(err, "i", false) { LOG.Warn("%s ignore error[%v] when insert", mcw, err) return nil } // duplicate key if strings.Contains(err.Error(), "duplicate key error") { if conf.Options.IncreaseExecutorInsertOnDupUpdate { LOG.Warn("%s duplicated document found[%v]. reinsert or update", mcw, err) return mcw.updateOnInsert(input, index) } } return err } return nil } func (mcw *MongoCommunityWriter) updateOnInsert(input []interface{}, index []interface{}) error { // upsert one by one for i := range input { LOG.Debug("upsert: selector[%v] update[%v]", index[i], input[i]) // _, err := mw.conn.Session.DB(mw.ns.Database).C(mw.ns.Collection).Upsert(index[i], input[i]) _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).ReplaceOne(nil, index[i], input[i], mcw.upsertOption) if err != nil { if utils.MongodbIgnoreError(err, "u", true) { LOG.Warn("%s ignore error[%v] when upsert", mcw, err) return nil } if strings.Contains(err.Error(), "duplicate key error") { // ignore duplicate key continue } return err } } return nil } func (mcw *MongoCommunityWriter) Delete(index []interface{}) error { // convert input array to models list models := make([]mongo.WriteModel, len(index)) for i := range models { models[i] = &mongo.DeleteOneModel{Filter: index[i]} } _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models) if err != nil { LOG.Warn(err) // always ignore ns not found error if utils.MongodbIgnoreError(err, "d", true) { LOG.Warn("%s ignore error[%v] when delete", mcw, err) return nil } return err } return nil } func (mcw *MongoCommunityWriter) Update(input []interface{}, index []interface{}) error { models := make([]mongo.WriteModel, len(index)) for i := range models { uom := &mongo.ReplaceOneModel{Filter: index[i], Replacement: input[i]} if conf.Options.IncreaseExecutorUpsert { uom.SetUpsert(true) } models[i] = uom } _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).BulkWrite(nil, models) if err != nil { LOG.Warn(err) if strings.Contains(err.Error(), "duplicate key error") { return mcw.updateOnInsert(input, index) } return err } return nil } func (mcw *MongoCommunityWriter) createPrimaryIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string) error { primaryKeyWithType, err := mcw.createSingleIndex(primaryIndexes, parseMap, true) if err != nil { return err } // write shard key if target mongodb is sharding if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeSharding { res := mcw.conn.Client.Database("admin").RunCommand(nil, bson2.D{ {"enablesharding", mcw.ns.Database}, }) if err := res.Err(); err != nil { if strings.Contains(err.Error(), "sharding already enabled") == false { return fmt.Errorf("enable sharding failed[%v]", err) } LOG.Warn("ns[%s] sharding already enabled: %v", mcw.ns, err) } res = mcw.conn.Client.Database("admin").RunCommand(nil, bson2.D{ {"shardCollection", mcw.ns.Str()}, {"key", bson2.M{primaryKeyWithType: "hashed"}}, {"options", bson2.M{"numInitialChunks": NumInitialChunks}}, }) if err := res.Err(); err != nil { return fmt.Errorf("shard collection[%s] failed[%v]", mcw.ns, err) } } return nil } func (mcw *MongoCommunityWriter) createUserIndex(globalSecondaryIndexes []*dynamodb.GlobalSecondaryIndexDescription, parseMap map[string]string) error { for _, gsi := range globalSecondaryIndexes { primaryIndexes := gsi.KeySchema // duplicate index will be ignored by MongoDB if _, err := mcw.createSingleIndex(primaryIndexes, parseMap, false); err != nil { LOG.Error("ns[%s] create users' single index failed[%v]", mcw.ns, err) return err } } return nil } func (mcw *MongoCommunityWriter) createSingleIndex(primaryIndexes []*dynamodb.KeySchemaElement, parseMap map[string]string, isPrimaryKey bool) (string, error) { primaryKey, sortKey, err := utils.ParsePrimaryAndSortKey(primaryIndexes, parseMap) if err != nil { return "", fmt.Errorf("parse primary and sort key failed[%v]", err) } primaryKeyWithType := mcw.fetchKey(primaryKey, parseMap[primaryKey]) primaryKeyWithType = conf.ConvertIdFunc(primaryKeyWithType) indexList := make([]string, 0, 2) indexList = append(indexList, primaryKeyWithType) if sortKey != "" { indexList = append(indexList, conf.ConvertIdFunc(mcw.fetchKey(sortKey, parseMap[sortKey]))) } LOG.Info("ns[%s] single index[%v] list[%v]", mcw.ns, primaryKeyWithType, indexList) // primary key should be unique unique := isPrimaryKey ctx := context.Background() // create union unique index if len(indexList) >= 2 { LOG.Info("create union-index isPrimary[%v]: %v", isPrimaryKey, indexList) var keysMap bson2.D for _, ele := range indexList { keysMap = append(keysMap, bson2.E{ele, 1}) } indexModel := mongo.IndexModel{ Keys: keysMap, Options: &options.IndexOptions{}, } indexModel.Options.SetUnique(unique) indexModel.Options.SetBackground(true) _, err := mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).Indexes().CreateOne(ctx, indexModel) if err != nil { return "", fmt.Errorf("create primary union unique[%v] index failed[%v]", unique, err) } } var indexType interface{} indexType = "hashed" if conf.Options.TargetMongoDBType == utils.TargetMongoDBTypeReplica { indexType = 1 } if len(indexList) >= 2 { // unique has already be set on the above index unique = false } else if unique { // must be range if only has 1 key indexType = 1 } indexModel := mongo.IndexModel{ Keys: bson2.M{ primaryKeyWithType: indexType, }, Options: &options.IndexOptions{}, } indexModel.Options.SetUnique(unique) indexModel.Options.SetBackground(true) _, err = mcw.conn.Client.Database(mcw.ns.Database).Collection(mcw.ns.Collection).Indexes().CreateOne(ctx, indexModel) if err != nil { return "", fmt.Errorf("create primary[%v] %v index failed[%v]", isPrimaryKey, indexType, err) } return primaryKeyWithType, nil } func (mcw *MongoCommunityWriter) fetchKey(key, tp string) string { switch conf.Options.ConvertType { case utils.ConvertTypeChange: fallthrough case utils.ConvertMTypeChange: fallthrough case utils.ConvertTypeSame: return key case utils.ConvertTypeRaw: return fmt.Sprintf("%s.%s", key, tp) } return "" }