executor/db_writer_single.go (228 lines of code) (raw):
package executor
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/oplog"
utils "github.com/alibaba/MongoShake/v2/common"
LOG "github.com/vinllen/log4go"
)
// use general single writer interface to execute command
type SingleWriter struct {
// mongo connection
conn *utils.MongoCommunityConn
// init sync finish timestamp
fullFinishTs int64
}
// { "op" : "i", "ns" : "test.c", "ui" : UUID("4654d08e-db1f-4e94-9778-90aeee4feff0"), "o" : { "_id" : ObjectId("627a1f83b95fae5fca006bac"), "a" : 1, "b" : 1, "c" : 1 }, "ts" : Timestamp(1652170627, 2), "t" : NumberLong(1), "wall" : ISODate("2022-05-10T08:17:07.558Z"), "v" : NumberLong(2) }
func (sw *SingleWriter) doInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, dupUpdate bool) error {
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
var upserts []*OplogRecord
for _, log := range oplogs {
if _, err := collectionHandle.InsertOne(context.Background(), log.original.partialLog.Object); err != nil {
if utils.DuplicateKey(err) {
upserts = append(upserts, log)
continue
} else {
LOG.Error("insert data[%v] failed[%v]", log.original.partialLog.Object, err)
return err
}
}
LOG.Debug("single_writer: insert %v", log.original.partialLog)
}
if len(upserts) != 0 {
RecordDuplicatedOplog(sw.conn, collection, upserts)
// update on duplicated key occur
if dupUpdate {
LOG.Info("Duplicated document found. reinsert or update to [%s] [%s]", database, collection)
return sw.doUpdateOnInsert(database, collection, metadata, upserts, conf.Options.IncrSyncExecutorUpsert)
}
return nil
}
return nil
}
func (sw *SingleWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
type pair struct {
id interface{}
data bson.D
index int
}
var updates []*pair
for i, log := range oplogs {
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
updates = append(updates, &pair{id: log.original.partialLog.DocumentKey, data: newObject, index: i})
} else {
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
updates = append(updates, &pair{id: bson.D{{"_id", id}}, data: newObject, index: i})
} else {
return fmt.Errorf("insert on duplicated update _id look up failed. %v", log.original.partialLog)
}
}
LOG.Debug("single_writer: updateOnInsert %v", log.original.partialLog)
}
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
if upsert {
for _, update := range updates {
opts := options.Update().SetUpsert(true)
res, err := collectionHandle.UpdateOne(context.Background(), update.id,
bson.D{{"$set", update.data}}, opts)
if err != nil {
LOG.Warn("upsert _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
update.id, update.data, err, res)
// error can be ignored(insert fail & oplog is before full end)
if utils.DuplicateKey(err) &&
utils.TimeStampToInt64(oplogs[update.index].original.partialLog.Timestamp) <= sw.fullFinishTs {
continue
}
LOG.Error("upsert _id[%v] with data[%v] failed[%v]", update.id, update.data, err)
return err
}
if res != nil {
if res.MatchedCount != 1 && res.UpsertedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) upsert _id[%v] with data[%v]",
res.MatchedCount, res.ModifiedCount, res.UpsertedCount, update.id, update.data)
}
}
}
} else {
for i, update := range updates {
res, err := collectionHandle.UpdateOne(context.Background(), update.id,
bson.D{{"$set", update.data}}, nil)
if err != nil && utils.DuplicateKey(err) == false {
LOG.Warn("update _id[%v] with data[%v] meets err[%v] res[%v], try to solve",
update.id, update.data, err, res)
// error can be ignored
if IgnoreError(err, "u",
utils.TimeStampToInt64(oplogs[i].original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
}
LOG.Error("update _id[%v] with data[%v] failed[%v]", update.id, update.data, err.Error())
return err
}
if res != nil {
if res.MatchedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d, ModifiedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, update.id, update.data)
}
}
}
}
return nil
}
/*
replace(update all):
db.c.insert({"a":1,"b":1,"c":1}) + db.c.update({"a":1}, {"b":2})
{ "op" : "u", "ns" : "test.c", "ui" : UUID("4654d08e-db1f-4e94-9778-90aeee4feff0"), "o" : { "_id" : ObjectId("627a2492b95fae5fca006bad"), "b" : 2 }, "o2" : { "_id" : ObjectId("627a2492b95fae5fca006bad") }, "ts" : Timestamp(1652171939, 1), "t" : NumberLong(1), "wall" : ISODate("2022-05-10T08:38:59.701Z"), "v" : NumberLong(2) }
updateOne:
db.c.insert({"a":1,"b":1,"c":1}) + db.c.updateOne({"a":1}, {"$set":{"b":2}})
{ "op" : "u", "ns" : "test.c", "ui" : UUID("4654d08e-db1f-4e94-9778-90aeee4feff0"), "o" : { "$v" : 1, "$set" : { "b" : 3 }, "$unset" : { "c" : true } }, "o2" : { "_id" : ObjectId("627a1f83b95fae5fca006bac") }, "ts" : Timestamp(1652170892, 1), "t" : NumberLong(1), "wall" : ISODate("2022-05-10T08:21:32.695Z"), "v" : NumberLong(2) }
*/
func (sw *SingleWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
for _, log := range oplogs {
var update interface{}
var err error
var res *mongo.UpdateResult
updateCmd := "update"
LOG.Debug("single_writer: org_doc %v", log.original.partialLog)
if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
var oplogErr error
oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
LOG.Debug("single_writer doUpdate: have $, org_object:%v "+
"object_ver:%v\n", log.original.partialLog.Object, oplogVer)
if ok && oplogVer == 2 {
if update, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
return oplogErr
}
} else {
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
update = log.original.partialLog.Object
}
opts := options.Update()
if upsert {
opts.SetUpsert(true)
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.DocumentKey,
update, opts)
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
res, err = collectionHandle.UpdateOne(context.Background(), log.original.partialLog.Query,
update, opts)
}
} else {
update = log.original.partialLog.Object
opts := options.Replace()
if upsert {
opts.SetUpsert(true)
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.DocumentKey,
update, opts)
} else {
res, err = collectionHandle.ReplaceOne(context.Background(), log.original.partialLog.Query,
update, opts)
}
updateCmd = "replace"
}
LOG.Debug("single_writer: %s %v aftermodify_doc:%v", updateCmd, update, log.original.partialLog)
if err != nil {
// error can be ignored
if IgnoreError(err, "u",
utils.TimeStampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
}
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(sw.conn, collection, oplogs)
continue
}
LOG.Error("doUpdate[upsert] old-data[%v] with new-data[%v] failed[%v]",
log.original.partialLog.Query, log.original.partialLog.Object, err)
return err
}
if res != nil {
if upsert {
if res.MatchedCount != 1 && res.UpsertedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d UpsertedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, res.UpsertedCount,
log.original.partialLog.Query, log.original.partialLog.Object)
}
} else {
if res.MatchedCount != 1 {
return fmt.Errorf("Update fail(MatchedCount:%d ModifiedCount:%d MatchedCount:%d) old-data[%v] with new-data[%v]",
res.MatchedCount, res.ModifiedCount, res.MatchedCount,
log.original.partialLog.Query, log.original.partialLog.Object)
}
}
}
LOG.Debug("single_writer: aftermodify_doc %v %s[%v]", log.original.partialLog, updateCmd, update)
}
return nil
}
// { "op" : "d", "ns" : "test.c", "ui" : UUID("4654d08e-db1f-4e94-9778-90aeee4feff0"), "o" : { "_id" : ObjectId("627a1f83b95fae5fca006bac") }, "ts" : Timestamp(1652171085, 1), "t" : NumberLong(1), "wall" : ISODate("2022-05-10T08:24:45.828Z"), "v" : NumberLong(2) }
func (sw *SingleWriter) doDelete(database, collection string, metadata bson.E, oplogs []*OplogRecord) error {
collectionHandle := sw.conn.Client.Database(database).Collection(collection)
for _, log := range oplogs {
// ignore ErrNotFound
_, err := collectionHandle.DeleteOne(context.Background(), log.original.partialLog.Object)
if err != nil {
LOG.Error("delete data[%v] failed[%v]", log.original.partialLog.Query, err)
return err
}
LOG.Debug("single_writer: delete %v", log.original.partialLog)
}
return nil
}
func (sw *SingleWriter) doCommand(database string, metadata bson.E, oplogs []*OplogRecord) error {
var err error
for _, log := range oplogs {
newObject := log.original.partialLog.Object
operation, found := oplog.ExtraCommandName(newObject)
if conf.Options.FilterDDLEnable || (found && oplog.IsSyncDataCommand(operation)) {
// execute one by one with sequence order
if err = RunCommand(database, operation, log.original.partialLog, sw.conn.Client); err == nil {
LOG.Info("Execute command (op==c) oplog, operation [%s]", operation)
} else if err.Error() == "ns not found" {
LOG.Info("Execute command (op==c) oplog, operation [%s], ignore error[ns not found]", operation)
} else if IgnoreError(err, "c", utils.TimeStampToInt64(log.original.partialLog.Timestamp) <= sw.fullFinishTs) {
continue
} else {
return err
}
} else {
// exec.batchExecutor.ReplMetric.AddFilter(1)
}
LOG.Debug("single_writer: command %v", log.original.partialLog)
}
return nil
}