executor/db_writer.go (199 lines of code) (raw):
package executor
import (
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/oplog"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
bson "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
const (
versionMark = "$v"
uuidMark = "ui"
shardKeyupdateErr = "Document shard key value updates that cause the doc to move shards must be sent with write batch of size 1"
)
type BasicWriter interface {
// insert operation
doInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, dupUpdate bool) error
// update when insert duplicated
doUpdateOnInsert(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error
// update operation
doUpdate(database, collection string, metadata bson.E, oplogs []*OplogRecord, upsert bool) error
// delete operation
doDelete(database, collection string, metadata bson.E, oplogs []*OplogRecord) error
/*
* command operation
* Generally speaking, we should use `applyOps` command in mongodb to insert these data,
* but this way will make the oplog in the target bigger than the source.
* In the following two cases, this will raise error:
* 1. mongoshake cascade: the oplog will be bigger every time go through mongoshake
* 2. the oplog is near 16MB(the oplog max threshold), use `applyOps` command will
* make the oplog bigger than 16MB so that rejected by the target mongodb.
*/
doCommand(database string, metadata bson.E, oplogs []*OplogRecord) error
}
// oplog writer
func NewDbWriter(conn *utils.MongoCommunityConn, metadata bson.E, bulkInsert bool, fullFinishTs int64) BasicWriter {
if !bulkInsert { // bulk insertion disable
// LOG.Info("db writer create: SingleWriter")
return &SingleWriter{conn: conn, fullFinishTs: fullFinishTs}
} else if metadata.Key == "g" { // has gid
// LOG.Info("db writer create: CommandWriter")
return &CommandWriter{conn: conn, fullFinishTs: fullFinishTs}
}
// LOG.Info("db writer create: BulkWriter")
return &BulkWriter{conn: conn, fullFinishTs: fullFinishTs} // bulk insertion enable
}
func RunCommand(database, operation string, log *oplog.PartialLog, client *mongo.Client) error {
defer LOG.Debug("RunCommand run DDL: %v", log.Dump(nil, true))
dbHandler := client.Database(database)
LOG.Info("RunCommand run DDL with type[%s]", operation)
var err error
switch operation {
case "createIndexes":
/*
* after v3.6, the given oplog should have uuid when run applyOps with createIndexes.
* so we modify oplog base this ref:
* https://docs.mongodb.com/manual/reference/command/createIndexes/#dbcmd.createIndexes
*/
var innerBsonD, indexes bson.D
for i, ele := range log.Object {
if i == 0 {
nimo.AssertTrue(ele.Key == "createIndexes", "should panic when ele.Name != 'createIndexes'")
} else {
innerBsonD = append(innerBsonD, ele)
}
}
indexes = append(indexes, log.Object[0]) // createIndexes
indexes = append(indexes, primitive.E{
Key: "indexes",
Value: []bson.D{ // only has 1 bson.D
innerBsonD,
},
})
err = dbHandler.RunCommand(nil, indexes).Err()
case "commitIndexBuild":
/*
If multiple indexes are created, commitIndexBuild only generate one oplog, CreateIndexes multiple oplogs
{ "op" : "c", "ns" : "test.$cmd", "ui" : UUID("617ffe90-6dac-4e71-b570-1825422c1896"),
"o" : { "commitIndexBuild" : "car", "indexBuildUUID" : UUID("4e9b7457-b612-42bb-bbad-bd6e9a2d63a7"),
"indexes" : [
{ "v" : 2, "key" : { "count" : 1 }, "name" : "count_1" },
{ "v" : 2, "key" : { "type" : 1 }, "name" : "type_1" }
]},
"ts" : Timestamp(1653620229, 6), "t" : NumberLong(1), "v" : NumberLong(2), "wall" : ISODate("2022-05-27T02:57:09.187Z") }
CreateIndexes Command: db.car.createIndexes([{"count":1},{"type":1}])
{ "ts" : Timestamp(1653620582, 3), "t" : NumberLong(2), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "test.$cmd", "ui" : UUID("51d35827-e8b5-4891-8818-41326718505d"), "wall" : ISODate("2022-05-27T03:03:02.282Z"), "o" : { "createIndexes" : "car", "v" : 2, "key" : { "type" : 1 }, "name" : "type_1" } }
{ "ts" : Timestamp(1653620582, 2), "t" : NumberLong(2), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "test.$cmd", "ui" : UUID("51d35827-e8b5-4891-8818-41326718505d"), "wall" : ISODate("2022-05-27T03:03:02.281Z"), "o" : { "createIndexes" : "car", "v" : 2, "key" : { "count" : 1 }, "name" : "count_1" } }
*/
var indexes bson.D
for i, ele := range log.Object {
if i == 0 {
indexes = append(indexes, primitive.E{
Key: "createIndexes",
Value: ele.Value.(string),
})
nimo.AssertTrue(ele.Key == "commitIndexBuild", "should panic when ele.Name != 'commitIndexBuild'")
} else {
if ele.Key == "indexes" {
indexes = append(indexes, primitive.E{
Key: "indexes",
Value: ele.Value,
})
}
}
}
nimo.AssertTrue(len(indexes) >= 2, "indexes must at least have two elements")
LOG.Debug("RunCommand commitIndexBuild oplog after conversion[%v]", indexes)
err = dbHandler.RunCommand(nil, indexes).Err()
case "applyOps":
/*
* Strictly speaking, we should handle applysOps nested case, but it is
* complicate to fulfill, so we just use "applyOps" to run the command directly.
*/
var store bson.D
for _, ele := range log.Object {
if utils.ApplyOpsFilter(ele.Key) {
continue
}
if ele.Key == "applyOps" {
switch v := ele.Value.(type) {
case []interface{}:
for i, ele := range v {
doc := ele.(bson.D)
v[i] = oplog.RemoveFiled(doc, uuidMark)
}
case bson.D:
ret := make(bson.D, 0, len(v))
for _, ele := range v {
if ele.Key == uuidMark {
continue
}
ret = append(ret, ele)
}
ele.Value = ret
case []bson.M:
for _, ele := range v {
if _, ok := ele[uuidMark]; ok {
delete(ele, uuidMark)
}
}
}
}
store = append(store, ele)
}
err = dbHandler.RunCommand(nil, store).Err()
case "dropDatabase":
err = dbHandler.Drop(nil)
case "create":
if oplog.GetKey(log.Object, "autoIndexId") != nil &&
oplog.GetKey(log.Object, "idIndex") != nil {
// exits "autoIndexId" and "idIndex", remove "autoIndexId"
log.Object = oplog.RemoveFiled(log.Object, "autoIndexId")
}
fallthrough
case "collMod":
fallthrough
case "drop":
fallthrough
case "deleteIndex":
fallthrough
case "deleteIndexes":
fallthrough
case "dropIndex":
fallthrough
case "dropIndexes":
fallthrough
case "convertToCapped":
fallthrough
case "renameCollection":
fallthrough
case "emptycapped":
if !oplog.IsRunOnAdminCommand(operation) {
err = dbHandler.RunCommand(nil, log.Object).Err()
} else {
err = client.Database("admin").RunCommand(nil, log.Object).Err()
}
default:
LOG.Info("type[%s] not found, use applyOps", operation)
// filter log.Object
var rec bson.D
for _, ele := range log.Object {
if utils.ApplyOpsFilter(ele.Key) {
continue
}
rec = append(rec, ele)
}
log.Object = rec // reset log.Object
var store bson.D
store = append(store, primitive.E{
Key: "applyOps",
Value: []bson.D{
log.Dump(nil, true),
},
})
err = dbHandler.RunCommand(nil, store).Err()
}
return err
}
// true means error can be ignored
// https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml
func IgnoreError(err error, op string, isFullSyncStage bool) bool {
if err == nil {
return true
}
er, ok := err.(mongo.ServerError)
if !ok {
return false
}
switch op {
case "i":
/*if isFullSyncStage {
if err == 11000 { // duplicate key
continue
}
}*/
case "u":
if isFullSyncStage {
if er.HasErrorCode(28) || er.HasErrorCode(211) { // PathNotViable
return true
}
}
case "ui":
if isFullSyncStage {
if er.HasErrorCode(11000) { // duplicate key
return true
}
}
case "d":
if er.HasErrorCode(26) { // NamespaceNotFound
return true
}
case "c":
if er.HasErrorCode(26) { // NamespaceNotFound
return true
}
default:
return false
}
return false
}
func parseLastTimestamp(oplogs []*OplogRecord) int64 {
if len(oplogs) == 0 {
return 0
}
return utils.TimeStampToInt64(oplogs[len(oplogs)-1].original.partialLog.Timestamp)
}