executor/db_writer_bulk.go (219 lines of code) (raw):
package executor
import (
"context"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/oplog"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"strings"
)
// use general bulk interface such like Insert/Update/Delete to execute command
type BulkWriter struct {
// mongo connection
conn *utils.MongoCommunityConn
// init sync finish timestamp
fullFinishTs int64
}
func (bw *BulkWriter) doInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, dupUpdate bool) error {
var models []mongo.WriteModel
for _, log := range oplogs {
models = append(models, mongo.NewInsertOneModel().SetDocument(log.original.partialLog.Object))
LOG.Debug("bulk_writer: insert org_oplog:%v insert_doc:%v",
log.original.partialLog, log.original.partialLog.Object)
}
opts := options.BulkWrite().SetOrdered(false)
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(nil, models, opts)
if err != nil {
LOG.Warn("insert docs with length[%v] into ns[%v] of dest mongo failed[%v] res[%v]",
len(models), database+"."+collection, (err.(mongo.BulkWriteException)).WriteErrors[0], res)
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(bw.conn, collection, oplogs)
// update on duplicated key occur
if dupUpdate {
LOG.Info("Duplicated document found. reinsert or update to [%s] [%s]", database, collection)
return bw.doUpdateOnInsert(database, collection, metadata, oplogs, conf.Options.IncrSyncExecutorUpsert)
}
return nil
}
return err
}
return nil
}
func (bw *BulkWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
var models []mongo.WriteModel
for _, log := range oplogs {
newObject := log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetUpdate(bson.D{{"$set", newObject}}).SetUpsert(true))
} else {
if upsert {
LOG.Warn("doUpdateOnInsert runs upsert but lack documentKey: %v", log.original.partialLog)
}
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
model := mongo.NewUpdateOneModel().
SetFilter(bson.D{{"_id", id}}).
SetUpdate(bson.D{{"$set", newObject}})
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
} else {
LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
}
}
LOG.Debug("bulk_writer: updateOnInsert %v", log.original.partialLog)
}
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(nil, models, nil)
if err != nil {
// parse error
index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
LOG.Error("detail error info with index[%v] msg[%v] dup[%v] res[%v]", index, errMsg, dup, res)
if utils.DuplicateKey(err) {
// create single writer to write one by one
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdateOnInsert(database, collection, metadata, oplogs[index:], upsert)
}
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
var oplogRecord *OplogRecord
if index != -1 {
oplogRecord = oplogs[index]
}
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v], oplog[%v]",
err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs, oplogRecord)
return nil
}
LOG.Error("doUpdateOnInsert run upsert/update[%v] failed[%v]", upsert, err)
return err
}
return nil
}
/*
replacement oplog:
{"ts":{"T":1664192510,"I":1},"t":1,"h":null,"v":2,"op":"u","ns":"test.car","o":[{"Key":"_id","Value":"63318f67024749a30fc12af6"},{"Key":"b","Value":3}],"o2":[{"Key":"_id","Value":"63318f67024749a30fc12af6"}],"PrevOpTime":null,"ui":{"Subtype":4,"Data":"3p7boGbmTvqYSWp42PaZnw=="}}
*/
func (bw *BulkWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error {
var models []mongo.WriteModel
for _, log := range oplogs {
var newObject interface{}
updateCmd := "update"
LOG.Debug("bulk_writer doUpdate: org_doc:%v", log.original.partialLog)
if oplog.FindFiledPrefix(log.original.partialLog.Object, "$") {
var oplogErr error
oplogVer, ok := oplog.GetKey(log.original.partialLog.Object, versionMark).(int32)
LOG.Debug("bulk_writer doUpdate: have $, org_object:%v "+
"object_ver:%v\n", log.original.partialLog.Object, oplogVer)
if ok && oplogVer == 2 {
if newObject, oplogErr = oplog.DiffUpdateOplogToNormal(log.original.partialLog.Object); oplogErr != nil {
LOG.Error("doUpdate run Faild err[%v] org_doc[%v]", oplogErr, log.original.partialLog)
return oplogErr
}
} else {
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
newObject = log.original.partialLog.Object
}
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetUpdate(newObject).SetUpsert(true))
} else {
if upsert {
LOG.Warn("doUpdate runs upsert but lack documentKey: %v", log.original.partialLog)
}
model := mongo.NewUpdateOneModel().
SetFilter(log.original.partialLog.Query).
SetUpdate(newObject)
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
}
} else {
newObject = log.original.partialLog.Object
if upsert && len(log.original.partialLog.DocumentKey) > 0 {
models = append(models, mongo.NewReplaceOneModel().
SetFilter(log.original.partialLog.DocumentKey).
SetReplacement(log.original.partialLog.Object).
SetUpsert(true))
} else {
model := mongo.NewReplaceOneModel().
SetFilter(log.original.partialLog.Query).
SetReplacement(log.original.partialLog.Object)
if upsert {
model.SetUpsert(true)
}
models = append(models, model)
}
updateCmd = "replace"
}
LOG.Debug("bulk_writer: %s %v aftermodify_doc:%v", updateCmd, newObject, log.original.partialLog)
}
LOG.Debug("bulk_writer: update models len %v", len(models))
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(
context.Background(), models, nil)
if err != nil {
// parse error
index, errMsg, dup := utils.FindFirstErrorIndexAndMessageN(err)
var oplogRecord *OplogRecord
if index != -1 {
oplogRecord = oplogs[index]
}
LOG.Warn("detail error info with index[%v] msg[%v] dup[%v], isFullSyncStage[%v], oplog[%v] res[%v]",
index, errMsg, dup, parseLastTimestamp(oplogs) <= bw.fullFinishTs,
*oplogRecord.original.partialLog, res)
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(bw.conn, collection, oplogs)
// create single writer to write one by one
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
}
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v]", err, "u",
parseLastTimestamp(oplogs) <= bw.fullFinishTs)
// re-run (index, len(oplogs) - 1]
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index+1:], upsert)
}
if strings.Contains(err.Error(), shardKeyupdateErr) {
LOG.Error("multiUpdateShardKey err_string:%s, index:%d, redo update shardkey singly",
err.Error(), index)
sw := NewDbWriter(bw.conn, bson.E{}, false, bw.fullFinishTs)
return sw.doUpdate(database, collection, metadata, oplogs[index:], upsert)
}
LOG.Error("doUpdate run upsert/update[%v] failed[%v]", upsert, err)
return err
}
return nil
}
func (bw *BulkWriter) doDelete(database, collection string, metadata bson.E, oplogs []*OplogRecord) error {
var models []mongo.WriteModel
for _, log := range oplogs {
models = append(models, mongo.NewDeleteOneModel().SetFilter(log.original.partialLog.Object))
LOG.Debug("bulk_writer: delete %v", log.original.partialLog)
}
opts := options.BulkWrite().SetOrdered(false)
res, err := bw.conn.Client.Database(database).Collection(collection).BulkWrite(context.Background(), models, opts)
if err != nil {
// error can be ignored
if IgnoreError(err, "d", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v]",
err, "d", parseLastTimestamp(oplogs) <= bw.fullFinishTs)
return nil
}
LOG.Error("doDelete run delete[%v] failed[%v] res[%v]", models, err, res)
return err
}
return nil
}
func (bw *BulkWriter) doCommand(database string, metadata bson.E, oplogs []*OplogRecord) error {
var err error
for _, log := range oplogs {
newObject := log.original.partialLog.Object
operation, found := oplog.ExtraCommandName(newObject)
if conf.Options.FilterDDLEnable || (found && oplog.IsSyncDataCommand(operation)) {
// execute one by one with sequence order
if err = RunCommand(database, operation, log.original.partialLog, bw.conn.Client); err == nil {
LOG.Info("Execute command (op==c) oplog, operation [%s]", operation)
} else if err.Error() == "ns not found" {
LOG.Info("Execute command (op==c) oplog, operation [%s], ignore error[ns not found]", operation)
} else if IgnoreError(err, "c", parseLastTimestamp(oplogs) <= bw.fullFinishTs) {
LOG.Warn("ignore error[%v] when run operation[%v], initialSync[%v]", err, "c", parseLastTimestamp(oplogs) <= bw.fullFinishTs)
return nil
} else {
return err
}
} else {
// exec.batchExecutor.ReplMetric.AddFilter(1)
}
LOG.Debug("bulk_writer: command %v", log.original.partialLog)
}
return nil
}