executor/db_writer_command.go (169 lines of code) (raw):
package executor
import (
"context"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/oplog"
"go.mongodb.org/mongo-driver/bson"
LOG "github.com/vinllen/log4go"
)
// use run_command to execute command
type CommandWriter struct {
// mongo connection
conn *utils.MongoCommunityConn
// init sync finish timestamp
fullFinishTs int64
}
func (cw *CommandWriter) doInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord,
dupUpdate bool) error {
var inserts []bson.D
for _, log := range oplogs {
newObject := log.original.partialLog.Object
inserts = append(inserts, newObject)
LOG.Debug("command_writer:: insert %v", log.original.partialLog)
}
dbHandle := cw.conn.Client.Database(database)
var err error
insertCmd := bson.D{
{"insert", collection},
{"bypassDocumentValidation", false},
{"documents", inserts},
{"ordered", ExecuteOrdered},
}
if metadata.Key == "g" {
insertCmd = append(insertCmd, metadata)
}
if err = dbHandle.RunCommand(context.Background(), insertCmd).Err(); err == nil {
return nil
}
LOG.Warn("doInsert failed: %v", err)
// error can be ignored
if IgnoreError(err, "i", parseLastTimestamp(oplogs) <= cw.fullFinishTs) {
LOG.Warn("error[%v] can be ignored", err)
return nil
}
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(cw.conn, collection, oplogs)
// update on duplicated key occur
if dupUpdate {
LOG.Info("Duplicated document found. reinsert or update to [%s] [%s]", database, collection)
return cw.doUpdateOnInsert(database, collection, metadata, oplogs, conf.Options.IncrSyncExecutorUpsert)
}
return nil
}
return err
}
func (cw *CommandWriter) doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord,
upsert bool) error {
var updates []bson.D
for _, log := range oplogs {
// insert must have _id
if id := oplog.GetKey(log.original.partialLog.Object, ""); id != nil {
updates = append(updates, bson.D{
{"q", bson.M{"_id": id}},
{"u", log.original.partialLog.Object},
{"upsert", upsert},
{"multi", false},
})
} else {
LOG.Warn("Insert on duplicated update _id look up failed. %v", log)
}
LOG.Debug("command_writer:: updateOnInsert %v", log.original.partialLog)
}
var err error
updateCmd := bson.D{
{"update", collection},
{"bypassDocumentValidation", false},
{"updates", updates},
{"ordered", ExecuteOrdered},
}
if metadata.Key == "g" {
updateCmd = append(updateCmd, metadata)
}
if err = cw.conn.Client.Database(database).RunCommand(context.Background(), updateCmd).Err(); err == nil {
return nil
}
LOG.Warn("doUpdateOnInsert failed: %v", err)
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= cw.fullFinishTs) {
return nil
}
// ignore duplicated again
if utils.DuplicateKey(err) {
LOG.Info("Duplicated document found on doUpdateOnInsert [%s] [%s]", database, collection)
return nil
}
return err
}
func (cw *CommandWriter) doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord,
upsert bool) error {
var updates []bson.D
for _, log := range oplogs {
log.original.partialLog.Object = oplog.RemoveFiled(log.original.partialLog.Object, versionMark)
updates = append(updates, bson.D{
{"q", log.original.partialLog.Query},
{"u", log.original.partialLog.Object},
{"upsert", upsert},
{"multi", false}})
LOG.Debug("command_writer:: update %v", log.original.partialLog)
}
var err error
updateCmd := bson.D{
{"update", collection},
{"bypassDocumentValidation", false},
{"updates", updates},
{"ordered", ExecuteOrdered},
}
if metadata.Key == "g" {
updateCmd = append(updateCmd, metadata)
}
if err = cw.conn.Client.Database(database).RunCommand(context.Background(), updateCmd).Err(); err == nil {
return nil
}
LOG.Warn("doUpdate failed: %v", err)
// error can be ignored
if IgnoreError(err, "u", parseLastTimestamp(oplogs) <= cw.fullFinishTs) {
return nil
}
// ignore dup error
if utils.DuplicateKey(err) {
RecordDuplicatedOplog(cw.conn, collection, oplogs)
LOG.Info("Duplicated document found on doUpdateOnInsert [%s] [%s]", database, collection)
return nil
}
return err
}
func (cw *CommandWriter) doDelete(database, collection string, metadata bson.E, oplogs []*OplogRecord) error {
var deleted []bson.D
var err error
for _, log := range oplogs {
deleted = append(deleted, bson.D{{"q", log.original.partialLog.Object}, {"limit", 0}})
LOG.Debug("command_writer:: delete %v", log.original.partialLog)
}
deleteCmd := bson.D{
{"delete", collection},
{"deletes", deleted},
{"ordered", ExecuteOrdered},
}
if metadata.Key == "g" {
deleteCmd = append(deleteCmd, metadata)
}
if err = cw.conn.Client.Database(database).RunCommand(context.Background(), deleteCmd).Err(); err == nil {
return nil
}
LOG.Warn("doDelete failed: %v", err)
// error can be ignored
if IgnoreError(err, "d", parseLastTimestamp(oplogs) <= cw.fullFinishTs) {
return nil
}
return err
}
func (cw *CommandWriter) doCommand(database string, metadata bson.E, oplogs []*OplogRecord) error {
var err error
for _, log := range oplogs {
operation, found := oplog.ExtraCommandName(log.original.partialLog.Object)
if conf.Options.FilterDDLEnable || (found && oplog.IsSyncDataCommand(operation)) {
// execute one by one with sequence order
if err = RunCommand(database, operation, log.original.partialLog, cw.conn.Client); err == nil {
LOG.Info("Execute command (op==c) oplog , operation [%s]", conf.Options.FilterDDLEnable,
operation)
} else if IgnoreError(err, "c", parseLastTimestamp(oplogs) <= cw.fullFinishTs) {
LOG.Debug("Ignore error[%v] [%s] [%v]", err, database, log.original.partialLog)
return nil
} else {
return err
}
} else {
// exec.batchExecutor.ReplMetric.AddFilter(1)
}
LOG.Debug("command_writer:: command %v", log.original.partialLog)
}
return nil
}