oplog/changestram_event.go (135 lines of code) (raw):
package oplog
import (
"encoding/json"
"fmt"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
const (
// field in oplog
OplogTsName = "ts"
OplogOperationName = "op"
OplogGidName = "g" // useless in change stream
OplogNamespaceName = "ns"
OplogObjectName = "o"
OplogQueryName = "o2"
OplogUniqueKeyName = "uk" // useless in change stream
OplogLsidName = "lsid"
OplogFromMigrateName = "fromMigrate"
)
/*
* example:
{
_id : { // 存储元信息
"_data" : <BinData|hex string> // resumeToken
},
"operationType" : "<operation>", // insert, delete, replace, update, drop, rename, dropDatabase, invalidate
"fullDocument" : { <document> }, // 修改后的数据,出现在insert, replace, delete, update. 相当于原来的o字段
"ns" : { // 就是ns
"db" : "<database>",
"coll" : "<collection"
},
"to" : { // 只在operationType==rename的时候有效,表示改名以后的ns
"db" : "<database>",
"coll" : "<collection"
},
"documentKey" : { "_id" : <value> }, // 相当于o2字段。出现在insert, replace, delete, update。正常只包含_id,对于sharded collection,还包括shard key。
"updateDescription" : { // 只在operationType==update的时候出现,相当于是增量的修改,而replace是替换。
"updatedFields" : { <document> }, // 更新的field的值
"removedFields" : [ "<field>", ... ] // 删除的field列表
},
"FullDocument" : { //永不为 nil
"fullDocument" : { <document> }, // 开启full_document之后,为updateLookup,不开启则为default
}
"clusterTime" : <Timestamp>, // 相当于ts字段
"txnNumber" : <NumberLong>, // 相当于oplog里面的txnNumber,只在事务里面出现。事务号在一个事务里面单调递增
"lsid" : { // 相当于lsid字段,只在事务里面出现。logic session id,请求所在的session的id。
"id" : <UUID>,
"uid" : <BinData>
}
}
*/
type Event struct {
Id bson.M `bson:"_id" json:"_id"`
OperationType string `bson:"operationType" json:"operationType"`
FullDocument bson.D `bson:"fullDocument,omitempty" json:"fullDocument,omitempty"` // exists on "insert", "replace", "delete", "update"
Ns bson.M `bson:"ns" json:"ns"`
To bson.M `bson:"to,omitempty" json:"to,omitempty"`
DocumentKey bson.D `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists on "insert", "replace", "delete", "update"
UpdateDescription bson.M `bson:"updateDescription,omitempty" json:"updateDescription,omitempty"`
ClusterTime primitive.Timestamp `bson:"clusterTime,omitempty" json:"clusterTime,omitempty"`
TxnNumber *int64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"`
LSID bson.Raw `bson:"lsid,omitempty" json:"lsid,omitempty"`
}
func (e *Event) String() string {
if ret, err := json.Marshal(e); err != nil {
return err.Error()
} else {
return string(ret)
}
}
func ConvertEvent2Oplog(input []byte, fulldoc bool) (*PartialLog, error) {
event := new(Event)
if err := bson.Unmarshal(input, event); err != nil {
return nil, fmt.Errorf("unmarshal raw bson[%s] failed: %v", input, err)
}
oplog := new(PartialLog)
// ts
oplog.Timestamp = event.ClusterTime
// transaction number
oplog.TxnNumber = event.TxnNumber
// lsid
oplog.LSID = event.LSID
// documentKey
if len(event.DocumentKey) > 0 {
oplog.DocumentKey = event.DocumentKey
}
ns := event.Ns
// do nothing for "g", "uk", "fromMigrate"
// handle different operation type
switch event.OperationType {
case "insert":
// insert zz.test {"kick":1}
/* event:
{
"_id" : {
"_data" : "825E4FA224000000012B022C0100296E5A100420D9F949CFC7496EA80E32BA633701A846645F696400645E4FA224A6717632D6EE2E850004"
},
"operationType" : "insert",
"clusterTime" : Timestamp(1582277156, 1),
"fullDocument" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85"),
"kick" : 1
},
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85")
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582277156, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "i",
"ns" : "zz.test",
"ui" : UUID("20d9f949-cfc7-496e-a80e-32ba633701a8"),
"wall" : ISODate("2020-02-21T09:25:56.570Z"),
"o" : {
"_id" : ObjectId("5e4fa224a6717632d6ee2e85"),
"kick" : 1
}
}*/
oplog.Namespace = fmt.Sprintf("%s.%s", event.Ns["db"], event.Ns["coll"])
oplog.Operation = "i"
oplog.Object = event.FullDocument
case "delete":
// remove zz.test {"kick":1}
/* event:
{
"_id" : {
"_data" : "825E537D02000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E537CF27DC0F30426F01B770004"
},
"operationType" : "delete",
"clusterTime" : Timestamp(1582529794, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e537cf27dc0f30426f01b77")
}
}
*/
/* oplog
{
"ts" : Timestamp(1582529794, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "d",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"wall" : ISODate("2020-02-24T07:36:34.063Z"),
"o" : {
"_id" : ObjectId("5e537cf27dc0f30426f01b77")
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "d"
oplog.Object = event.DocumentKey
case "replace":
// db.test.update({"kick":1}, {"kick":10, "ok":true})
/* event
{
"_id" : {
"_data" : "825E538501000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E5384F97DC0F30426F01B790004"
},
"operationType" : "replace",
"clusterTime" : Timestamp(1582531841, 1),
"fullDocument" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79"),
"kick" : 10,
"ok" : true
},
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
}
}
*/
/*
{
"ts" : Timestamp(1582531841, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "u",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"wall" : ISODate("2020-02-24T08:10:41.636Z"),
"o" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79"),
"kick" : 10,
"ok" : true
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "u"
oplog.Query = event.DocumentKey
oplog.Object = bson.D{{"$set", event.FullDocument}}
case "update":
/*
* mgset-xxx:PRIMARY> db.test.find()
* { "_id" : ObjectId("5e5384f97dc0f30426f01b79"), "kick" : 10, "ok" : true }
* mgset-xxx:PRIMARY> db.test.update({"kick":10}, {$set:{"plus_field":2}, $unset:{"ok":1}})
* mgset-xxx:PRIMARY> db.test.find()
* { "_id" : ObjectId("5e5384f97dc0f30426f01b79"), "kick" : 10, "plus_field" : 2 }
*/
/* event:
{
"_id" : {
"_data" : "825E5389D5000000022B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6046645F696400645E5384F97DC0F30426F01B790004"
},
"operationType" : "update",
"clusterTime" : Timestamp(1582533077, 2),
"ns" : {
"db" : "zz",
"coll" : "test"
},
"documentKey" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"updateDescription" : {
"updatedFields" : {
"plus_field" : 2
},
"removedFields" : [ "ok" ]
}
}
*/
/* oplog
{
"ts" : Timestamp(1582533077, 2),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "u",
"ns" : "zz.test",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"_id" : ObjectId("5e5384f97dc0f30426f01b79")
},
"wall" : ISODate("2020-02-24T08:31:17.681Z"),
"o" : {
"$v" : 1,
"$unset" : {
"ok" : true
},
"$set" : {
"plus_field" : 2
}
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.%s", ns["db"], ns["coll"])
oplog.Operation = "u"
oplog.Query = event.DocumentKey
if fulldoc && event.FullDocument != nil && len(event.FullDocument) > 0 {
oplog.Object = bson.D{{"$set", event.FullDocument}}
} else {
oplog.Object = make(bson.D, 0, 2)
if updatedFields, ok := event.UpdateDescription["updatedFields"]; ok && len(updatedFields.(bson.M)) > 0 {
oplog.Object = append(oplog.Object, primitive.E{
Key: "$set",
Value: updatedFields,
})
}
if removedFields, ok := event.UpdateDescription["removedFields"]; ok && len(removedFields.(primitive.A)) > 0 {
removedFieldsMap := make(bson.M)
for _, ele := range removedFields.(primitive.A) {
removedFieldsMap[ele.(string)] = 1
}
oplog.Object = append(oplog.Object, primitive.E{
Key: "$unset",
Value: removedFieldsMap,
})
}
}
case "drop":
// mgset-xxx:PRIMARY> db.test.drop()
/* event:
{
"_id" : {
"_data" : "825E538DF7000000012B022C0100296E5A1004EE9B60D8845F42FF989D09018A730D6004"
},
"operationType" : "drop",
"clusterTime" : Timestamp(1582534135, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582534135, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"ui" : UUID("ee9b60d8-845f-42ff-989d-09018a730d60"),
"o2" : {
"numRecords" : 3
},
"wall" : ISODate("2020-02-24T08:48:55.148Z"),
"o" : {
"drop" : "test"
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{
primitive.E{
Key: "drop",
Value: ns["coll"],
},
}
// ignore o2
case "rename":
// mgset-22785363:PRIMARY> db.test.renameCollection("test_collection_rename")
/* event:
{
"to" : {
"db" : "zz",
"coll" : "test_collection_rename"
},
"_id" : {
"_data" : "825E53910A000000012B022C0100296E5A1004DA21CB04A4C846AD8B3C3E8E9314F4B504"
},
"operationType" : "rename",
"clusterTime" : Timestamp(1582534922, 1),
"ns" : {
"db" : "zz",
"coll" : "test"
}
}
*/
/* oplog:
{
"ts" : Timestamp(1582534922, 1),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"ui" : UUID("da21cb04-a4c8-46ad-8b3c-3e8e9314f4b5"),
"wall" : ISODate("2020-02-24T09:02:02.685Z"),
"o" : {
"renameCollection" : "zz.test",
"to" : "zz.test_collection_rename",
"stayTemp" : false
"dropTarget": UUID("52c1c147-2408-4d96-9d0f-889a759ab079"), // field only exists when target collection is exists
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{ // should enable drop_database option on the replayer by default
primitive.E{
Key: "renameCollection",
Value: fmt.Sprintf("%s.%s", ns["db"], ns["coll"]),
},
primitive.E{
Key: "to",
Value: fmt.Sprintf("%s.%s", event.To["db"], event.To["coll"]),
},
}
case "dropDatabase":
// mgset-22785363:PRIMARY> db.dropDatabase()
/* event:
// before the next event, there'll be several "drop" events about the inner collection remove
{
"_id" : {
"_data" : "825E5393A5000000042B022C0100296E04"
},
"operationType" : "dropDatabase",
"clusterTime" : Timestamp(1582535589, 4),
"ns" : {
"db" : "zz"
}
}
*/
/* oplog
{
"ts" : Timestamp(1582535589, 4),
"t" : NumberLong(1),
"h" : NumberLong(0),
"v" : 2,
"op" : "c",
"ns" : "zz.$cmd",
"wall" : ISODate("2020-02-24T09:13:09.165Z"),
"o" : {
"dropDatabase" : 1
}
}
*/
oplog.Namespace = fmt.Sprintf("%s.$cmd", ns["db"])
oplog.Operation = "c"
oplog.Object = bson.D{
primitive.E{
Key: "dropDatabase",
Value: 1,
},
}
case "invalidate":
/*
* this case shouldn't happen because we watch the whole MongoDB, so we need to panic here
* once happen to find the root cause.
*/
return nil, fmt.Errorf("invalidate event happen, should be handle manually: %s", event)
default:
return nil, fmt.Errorf("unknown event type[%v] org_event[%v]", event.OperationType, event)
}
// set default for "o", "o2"
if oplog.Object == nil {
oplog.Object = bson.D{}
}
if oplog.Query == nil {
oplog.Query = bson.D{}
}
LOG.Debug("ConvertEvent2Oplog Event[%v] to Oplog[%v]", event, oplog)
return oplog, nil
}