oplog/oplog.go (332 lines of code) (raw):
package oplog
import (
"encoding/json"
"fmt"
LOG "github.com/vinllen/log4go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"reflect"
"strings"
)
const (
PrimaryKey = "_id"
)
type GenericOplog struct {
Raw []byte
Parsed *PartialLog
}
type ParsedLog struct {
Timestamp primitive.Timestamp `bson:"ts" json:"ts"`
Term *int64 `bson:"t" json:"t"`
Hash *int64 `bson:"h" json:"h"`
Version int `bson:"v" json:"v"`
Operation string `bson:"op" json:"op"`
Gid string `bson:"g,omitempty" json:"g,omitempty"`
Namespace string `bson:"ns" json:"ns"`
Object bson.D `bson:"o" json:"o"`
Query bson.D `bson:"o2" json:"o2"` // update condition
UniqueIndexes bson.M `bson:"uk,omitempty" json:"uk,omitempty"` //
LSID bson.Raw `bson:"lsid,omitempty" json:"lsid,omitempty"` // mark the session id, used in transaction
FromMigrate bool `bson:"fromMigrate,omitempty" json:"fromMigrate,omitempty"` // move chunk
TxnNumber *int64 `bson:"txnNumber,omitempty" json:"txnNumber,omitempty"` // transaction number in session
DocumentKey bson.D `bson:"documentKey,omitempty" json:"documentKey,omitempty"` // exists when source collection is sharded, only including shard key and _id
PrevOpTime bson.Raw `bson:"prevOpTime,omitempty"`
UI *primitive.Binary `bson:"ui,omitempty" json:"ui,omitempty"` // do not enable currently
}
type PartialLog struct {
ParsedLog
/*
* Every field subsequent declared is NEVER persistent or
* transfer on network connection. They only be parsed from
* respective logic
*/
UniqueIndexesUpdates bson.M // generate by CollisionMatrix
RawSize int // generate by Decorator
SourceId int // generate by Validator
}
func LogEntryEncode(logs []*GenericOplog) [][]byte {
encodedLogs := make([][]byte, 0, len(logs))
// log entry encode
for _, log := range logs {
if log.Raw == nil {
if out, err := bson.Marshal(log.Parsed); err != nil {
LOG.Crashf("LogEntryEncode marshal Oplog[%v] failed[%v]", log.Parsed, err)
} else {
encodedLogs = append(encodedLogs, out)
}
} else {
encodedLogs = append(encodedLogs, log.Raw)
}
}
return encodedLogs
}
func LogParsed(logs []*GenericOplog) []*PartialLog {
parsedLogs := make([]*PartialLog, len(logs), len(logs))
for i, log := range logs {
parsedLogs[i] = log.Parsed
}
return parsedLogs
}
func NewPartialLog(data bson.M) *PartialLog {
parsedLog := new(ParsedLog)
logType := reflect.TypeOf(*parsedLog)
for i := 0; i < logType.NumField(); i++ {
tagNameWithOption := logType.Field(i).Tag.Get("bson")
tagName := strings.Split(tagNameWithOption, ",")[0]
if v, ok := data[tagName]; ok {
reflect.ValueOf(parsedLog).Elem().Field(i).Set(reflect.ValueOf(v))
}
}
return &PartialLog{
ParsedLog: *parsedLog,
}
}
func (partialLog *PartialLog) String() string {
if ret, err := json.Marshal(partialLog.ParsedLog); err != nil {
return err.Error()
} else {
return string(ret)
}
}
// dump according to the given keys, "all" == true means ignore keys
func (partialLog *PartialLog) Dump(keys map[string]struct{}, all bool) bson.D {
var out bson.D
logType := reflect.TypeOf(partialLog.ParsedLog)
for i := 0; i < logType.NumField(); i++ {
if tagNameWithOption, ok := logType.Field(i).Tag.Lookup("bson"); ok {
// out[tagName] = reflect.ValueOf(partialLog).Elem().Field(i).Interface()
value := reflect.ValueOf(partialLog.ParsedLog).Field(i).Interface()
tagName := strings.Split(tagNameWithOption, ",")[0]
if !all {
if _, ok := keys[tagName]; !ok {
continue
}
}
out = append(out, primitive.E{tagName, value})
}
}
return out
}
func GetKey(log bson.D, wanted string) interface{} {
ret, _ := GetKeyWithIndex(log, wanted)
return ret
}
func GetKeyWithIndex(log bson.D, wanted string) (interface{}, int) {
if wanted == "" {
wanted = PrimaryKey
}
// "_id" is always the first field
for id, ele := range log {
if ele.Key == wanted {
return ele.Value, id
}
}
return nil, 0
}
func ConvertBsonD2MExcept(input bson.D, except map[string]struct{}) (bson.M, map[string]struct{}) {
m := bson.M{}
keys := make(map[string]struct{}, len(input))
for _, ele := range input {
switch ele.Value.(type) {
case bson.D:
if _, ok := except[ele.Key]; ok {
m[ele.Key] = ele.Value
} else {
son, _ := ConvertBsonD2M(ele.Value.(bson.D))
m[ele.Key] = son
}
default:
m[ele.Key] = ele.Value
}
keys[ele.Key] = struct{}{}
}
return m, keys
}
// convert bson.D to bson.M
func ConvertBsonD2M(input bson.D) (bson.M, map[string]struct{}) {
m := bson.M{}
keys := make(map[string]struct{}, len(input))
for _, ele := range input {
m[ele.Key] = ele.Value
switch ele.Value.(type) {
case bson.D:
son, _ := ConvertBsonD2M(ele.Value.(bson.D))
m[ele.Key] = son
default:
m[ele.Key] = ele.Value
}
keys[ele.Key] = struct{}{}
}
return m, keys
}
func ConvertBsonM2D(input bson.M) bson.D {
output := make(bson.D, 0, len(input))
for key, val := range input {
output = append(output, primitive.E{
Key: key,
Value: val,
})
}
return output
}
// ConvertBsonM2E convert a bson.M with only 1 elem to bson.E, which can be added to other bson.D
func ConvertBsonM2E(input bson.M) (bson.E, error) {
if len(input) > 1 {
return bson.E{}, fmt.Errorf("input bson.M has multi elems, can't convert to bson.E")
}
var e bson.E
for key, val := range input {
e = bson.E{Key: key, Value: val}
break
}
return e, nil
}
// pay attention: the input bson.D will be modified.
func RemoveFiled(input bson.D, key string) bson.D {
flag := -1
for id := range input {
if input[id].Key == key {
flag = id
break
}
}
if flag != -1 {
input = append(input[:flag], input[flag+1:]...)
}
return input
}
func FindFiledPrefix(input bson.D, prefix string) bool {
for id := range input {
if strings.HasPrefix(input[id].Key, prefix) {
return true
}
}
return false
}
func SetFiled(input bson.D, key string, value interface{}) {
for i, ele := range input {
if ele.Key == key {
input[i].Value = value
}
}
}
func GatherApplyOps(input []*PartialLog) (*GenericOplog, error) {
if len(input) == 0 {
return nil, fmt.Errorf("input list is empty")
}
newOplog := &PartialLog{
ParsedLog: ParsedLog{
Timestamp: input[0].Timestamp,
Operation: "c",
Gid: input[0].Gid,
Namespace: "admin.$cmd",
UniqueIndexes: input[0].UniqueIndexes,
LSID: input[0].LSID,
FromMigrate: input[0].FromMigrate,
},
}
applyOpsList := make([]bson.M, 0, len(input))
for _, ele := range input {
applyOpsList = append(applyOpsList, bson.M{
"op": ele.Operation,
"ns": ele.Namespace,
"o": ele.Object,
"o2": ele.Query,
})
}
newOplog.Object = bson.D{
primitive.E{
Key: "applyOps",
Value: applyOpsList,
},
}
if out, err := bson.Marshal(newOplog); err != nil {
return nil, fmt.Errorf("marshal new oplog[%v] failed[%v]", newOplog, err)
} else {
return &GenericOplog{
Raw: out,
Parsed: newOplog,
}, nil
}
}
// Oplog from mongod(5.0) in sharding&replica
// {"ts":{"T":1653449035,"I":3},"v":2,"op":"u","ns":"test.bar",
// "o":[{"Key":"diff","Value":[{"Key":"d","Value":[{"Key":"ok","Value":false}]},
// {"Key":"i","Value":[{"Key":"plus_field","Value":2}]}]}],
// "o2":[{"Key":"_id","Value":"628da11482387c117d4e9e45"}]}
// "o" : { "$v" : 2, "diff" : { "d" : { "count" : false }, "u" : { "name" : "orange" }, "i" : { "c" : 11 } } }
func DiffUpdateOplogToNormal(updateObj bson.D) (interface{}, error) {
diffObj := GetKey(updateObj, "diff")
if diffObj == nil {
return updateObj, fmt.Errorf("don't have diff field updateObj:[%v]", updateObj)
}
bsonDiffObj, ok := diffObj.(bson.D)
if !ok {
return updateObj, fmt.Errorf("diff field is not bson.D updateObj:[%v]", updateObj)
}
result, err := BuildUpdateDelteOplog("", bsonDiffObj)
if err != nil {
return updateObj, fmt.Errorf("parse diffOplog failed updateObj:[%v] err[%v]", updateObj, err)
}
return result, nil
}
func BuildUpdateDelteOplog(prefixField string, obj bson.D) (interface{}, error) {
var result bson.D
for _, ele := range obj {
if ele.Key == "d" {
result = append(result, primitive.E{
Key: "$unset",
Value: combinePrefixField(prefixField, ele.Value)})
} else if ele.Key == "i" || ele.Key == "u" {
result = append(result, primitive.E{
Key: "$set",
Value: combinePrefixField(prefixField, ele.Value)})
} else if len(ele.Key) > 1 && ele.Key[0] == 's' {
// s means subgroup field(array or nest)
tmpPrefixField := ""
if len(prefixField) == 0 {
tmpPrefixField = ele.Key[1:]
} else {
tmpPrefixField = prefixField + "." + ele.Key[1:]
}
nestObj, err := BuildUpdateDelteOplog(tmpPrefixField, ele.Value.(bson.D))
if err != nil {
return obj, fmt.Errorf("parse ele[%v] failed, updateObj:[%v]", ele, obj)
}
if _, ok := nestObj.(mongo.Pipeline); ok {
return nestObj, nil
} else if _, ok := nestObj.(bson.D); ok {
for _, nestObjEle := range nestObj.(bson.D) {
result = append(result, nestObjEle)
}
} else {
return obj, fmt.Errorf("unknown nest type ele[%v] updateObj:[%v] nestObj[%v]", ele, obj, nestObj)
}
} else if len(ele.Key) > 1 && ele.Key[0] == 'u' {
result = append(result, primitive.E{
Key: "$set",
Value: bson.D{
primitive.E{
Key: prefixField + "." + ele.Key[1:],
Value: ele.Value,
},
},
})
} else if ele.Key == "l" {
if len(result) != 0 {
return obj, fmt.Errorf("len should be 0, Key[%v] updateObj:[%v], result:[%v]",
ele, obj, result)
}
return mongo.Pipeline{
{{"$set", bson.D{
{prefixField, bson.D{
{"$slice", []interface{}{"$" + prefixField, ele.Value}},
}},
}}},
}, nil
} else if ele.Key == "a" && ele.Value == true {
continue
} else {
return obj, fmt.Errorf("unknow Key[%v] updateObj:[%v]", ele, obj)
}
}
return result, nil
}
func combinePrefixField(prefixField string, obj interface{}) interface{} {
if len(prefixField) == 0 {
return obj
}
tmpObj, ok := obj.(bson.D)
if !ok {
return obj
}
var result bson.D
for _, ele := range tmpObj {
result = append(result, primitive.E{
Key: prefixField + "." + ele.Key,
Value: ele.Value})
}
return result
}