in oplog/changestram_event.go [77:452]
func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error) {
event := new(Event)
if err := bson.Unmarshal(input, event); err != nil {
return nil, fmt.Errorf("unmarshal raw bson[%s] failed: %v", input, err)
}
oplog := new(PartialLog)
// ts
oplog.Timestamp = event.ClusterTime
// transaction number
oplog.TxnNumber = event.TxnNumber
// lsid
oplog.LSID = event.LSID
// documentKey
if len(event.DocumentKey) > 0 {
oplog.DocumentKey = event.DocumentKey
}
ns := event.Ns
// do nothing for "g", "uk", "fromMigrate"
// handle different operation type
switch event.OperationType {
case "insert":
// insert zz.test {"kick":1}
/* event:
{
"_id" : {
"_data" : "825E4FA224000000012B022C0100296E5A100420D9F949CFC7496EA80E32BA633701A846645F696400645E4FA224A6717632D6EE2E850004"
},
"operationType" : "insert",
"clusterTime" : Timestamp(1582277156, 1),
"fullDocument" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85"),
"kick" : 1
},
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85")
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582277156, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "i",
"ns" : "zz.test",
"ui" : UUID("20d9f949-cfc7-496e-a80e-32ba633701a8"),
"wall" : ISODate("2020-02-21T09:25:56.570Z"),
"o" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85"),
"kick" : 1
}
}*/
oplog.Namespace = fmt.Sprintf("%s.%s", event.Ns["db"], event.Ns["coll"])
oplog.Operation = "i"
oplog.Object = event.FullDocument
case "delete":
// remove zz.test {"kick":1}
/* event:
{
"_id" : {
"_data" : "825E537D02000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E537CF27DC0F30426F01B770004"
},
"operationType" : "delete",
"clusterTime" : Timestamp(1582529794, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e537cf27dc0f30426f01b77")
}
}
*/
/* oplog
{
"ts" : Timestamp(1582529794, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "d",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"wall" : ISODate("2020-02-24T07:36:34.063Z"),
"o" : {
"_id" : ObjectId("5e537cf27dc0f30426f01b77")
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "d"
oplog.Object = event.DocumentKey
case "replace":
// db.test.update({"kick":1}, {"kick":10, "ok":true})
/* event
{
"_id" : {
"_data" : "825E538501000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E5384F97DC0F30426F01B790004"
},
"operationType" : "replace",
"clusterTime" : Timestamp(1582531841, 1),
"fullDocument" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79"),
"kick" : 10,
"ok" : true
},
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
}
}
*/
/*
{
"ts" : Timestamp(1582531841, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "u",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"wall" : ISODate("2020-02-24T08:10:41.636Z"),
"o" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79"),
"kick" : 10,
"ok" : true
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "u"
oplog.Query = event.DocumentKey
oplog.Object = bson.D{{"$set", event.FullDocument}}
case "update":
/*
* mgset-xxx:PRIMARY> db.test.find()
* { "_id" : ObjectId("5e5384f97dc0f30426f01b79"), "kick" : 10, "ok" : true }
* mgset-xxx:PRIMARY> db.test.update({"kick":10}, {$set:{"plus_field":2}, $unset:{"ok":1}})
* mgset-xxx:PRIMARY> db.test.find()
* { "_id" : ObjectId("5e5384f97dc0f30426f01b79"), "kick" : 10, "plus_field" : 2 }
*/
/* event:
{
"_id" : {
"_data" : "825E5389D5000000022B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E5384F97DC0F30426F01B790004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1582533077, 2),
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"updateDescription" : {
"updatedFields" : {
"plus_field" : 2
},
"removedFields" : [ "ok" ]
}
}
*/
/* oplog
{
"ts" : Timestamp(1582533077, 2),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "u",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"wall" : ISODate("2020-02-24T08:31:17.681Z"),
"o" : {
"$v" : 1,
"$unset" : {
"ok" : true
},
"$set" : {
"plus_field" : 2
}
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "u"
oplog.Query = event.DocumentKey
if fulldoc && event.FullDocument != nil && len(event.FullDocument) > 0 {
oplog.Object = bson.D{{"$set", event.FullDocument}}
} else {
oplog.Object = make(bson.D, 0, 2)
if updatedFields, ok := event.UpdateDescription["updatedFields"]; ok && len(updatedFields.(bson.M)) > 0 {
oplog.Object = append(oplog.Object, primitive.E{
Key: "$set",
Value: updatedFields,
})
}
if removedFields, ok := event.UpdateDescription["removedFields"]; ok && len(removedFields.(primitive.A)) > 0 {
removedFieldsMap := make(bson.M)
for _, ele := range removedFields.(primitive.A) {
removedFieldsMap[ele.(string)] = 1
}
oplog.Object = append(oplog.Object, primitive.E{
Key: "$unset",
Value: removedFieldsMap,
})
}
}
case "drop":
// mgset-xxx:PRIMARY> db.test.drop()
/* event:
{
"_id" : {
"_data" : "825E538DF7000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6004"
},
"operationType" : "drop",
"clusterTime" : Timestamp(1582534135, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582534135, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"numRecords" : 3
},
"wall" : ISODate("2020-02-24T08:48:55.148Z"),
"o" : {
"drop" : "test"
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{
primitive.E{
Key: "drop",
Value: ns["coll"],
},
}
// ignore o2
case "rename":
// mgset-22785363:PRIMARY> db.test.renameCollection("test_collection_rename")
/* event:
{
"to" : {
"db" : "zz",
"coll" : "test_collection_rename"
},
"_id" : {
"_data" : "825E53910A000000012B022C0100296E5A1004DA21CB04A4C846AD8B3C3E8E9314F4B504"
},
"operationType" : "rename",
"clusterTime" : Timestamp(1582534922, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582534922, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"ui" : UUID("da21cb04-a4c8-46ad-8b3c-3e8e9314f4b5"),
"wall" : ISODate("2020-02-24T09:02:02.685Z"),
"o" : {
"renameCollection" : "zz.test",
"to" : "zz.test_collection_rename",
"stayTemp" : false
"dropTarget": UUID("52c1c147-2408-4d96-9d0f-889a759ab079"), // field only exists when target collection is exists
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{ // should enable drop_database option on the replayer by default
primitive.E{
Key: "renameCollection",
Value: fmt.Sprintf("%s.%s", ns["db"], ns["coll"]),
},
primitive.E{
Key: "to",
Value: fmt.Sprintf("%s.%s", event.To["db"], event.To["coll"]),
},
}
case "dropDatabase":
// mgset-22785363:PRIMARY> db.dropDatabase()
/* event:
// before the next event, there'll be several "drop" events about the inner collection remove
{
"_id" : {
"_data" : "825E5393A5000000042B022C0100296E04"
},
"operationType" : "dropDatabase",
"clusterTime" : Timestamp(1582535589, 4),
"ns" : {
"db" : "zz"
}
}
*/
/* oplog
{
"ts" : Timestamp(1582535589, 4),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"wall" : ISODate("2020-02-24T09:13:09.165Z"),
"o" : {
"dropDatabase" : 1
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{
primitive.E{
Key: "dropDatabase",
Value: 1,
},
}
case "invalidate":
/*
* this case shouldn't happen because we watch the whole MongoDB, so we need to panic here
* once happen to find the root cause.
*/
return nil, fmt.Errorf("invalidate event happen, should be handle manually: %s", event)
default:
return nil, fmt.Errorf("unknown event type[%v] org_event[%v]", event.OperationType, event)
}
// set default for "o", "o2"
if oplog.Object == nil {
oplog.Object = bson.D{}
}
if oplog.Query == nil {
oplog.Query = bson.D{}
}
LOG.Debug("ConvertEvent2Oplog Event[%v] to Oplog[%v]", event, oplog)
return oplog, nil
}