executor/executor.go (359 lines of code) (raw):
package executor
import (
"fmt"
"go.mongodb.org/mongo-driver/bson"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/collector/transform"
utils "github.com/alibaba/MongoShake/v2/common"
"github.com/alibaba/MongoShake/v2/oplog"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
const (
DumpConflictToDB = "db"
NoDumpConflict = "none"
ExecuteOrdered = false
OpInsert = 0x01
OpUpdate = 0x02
OplogsMaxGroupNum = 1000
OplogsMaxGroupSize = 12 * 1024 * 1024 // MongoDB limits 16MB
)
var (
GlobalExecutorId int32 = -1
ThresholdVersion string = "3.2.0"
)
type PartialLogWithCallbak struct {
partialLog *oplog.PartialLog
callback func()
}
type BatchGroupExecutor struct {
// multi executor
executors []*Executor
// worker id
ReplayerId uint32
// mongo url
MongoUrl string
// tranform namespace
NsTrans *transform.NamespaceTransform
// init sync finish timestamp
FullFinishTs int64
}
func (batchExecutor *BatchGroupExecutor) Start() {
// max concurrent execute connection sets to 64. And the total
// conns = number of executor * number of batchExecutor. Normally max
// is 64. if collector hashed oplogRecords by _id and the number of collector
// is bigger we will use single executer in respective batchExecutor
parallel := conf.Options.IncrSyncExecutor
if len(conf.Options.TransformNamespace) > 0 {
batchExecutor.NsTrans = transform.NewNamespaceTransform(conf.Options.TransformNamespace)
}
executors := make([]*Executor, parallel)
for i := 0; i != len(executors); i++ {
executors[i] = NewExecutor(GenerateExecutorId(), batchExecutor, batchExecutor.MongoUrl)
executors[i].RestAPI()
go executors[i].start()
}
batchExecutor.executors = executors
}
func (batchExecutor *BatchGroupExecutor) Sync(rawLogs []*oplog.PartialLog, callback func()) {
count := uint64(len(rawLogs))
if count == 0 {
// may be probe request
return
}
logs := make([]*PartialLogWithCallbak, len(rawLogs), len(rawLogs))
// populate the batch buffer first
for i, rawLog := range rawLogs {
logs[i] = &PartialLogWithCallbak{partialLog: rawLog}
}
// only the last oplog message would be notified
logs[len(logs)-1].callback = callback
batchExecutor.replay(logs)
}
func (batchExecutor *BatchGroupExecutor) replay(logs []*PartialLogWithCallbak) {
// TODO: skip the oplogRecords which has been replayed
// lastTs := utils.TimestampToInt64(logs[len(logs)-1].partialLog.Timestamp)
// if batchExecutor.replayer.Ack >= lastTs {
// // every oplog in buffer have been alread executed in previously
// // so discard them simply. Even the smaller timestamp oplogRecords has
// // been changed(other collector or other mongos source)
// return
// }
// executor needs to check pausing or throttle here.
batchExecutor.replicateShouldStall(int64(len(logs)))
// In mongo shard cluster. our request goes into mongos. it's safe for
// unique index without collision detection
var matrix CollisionMatrix = &NoopMatrix{}
if conf.Options.IncrSyncCollisionEnable {
matrix = NewBarrierMatrix()
}
// firstly. we split the oplogRecords into segments which are the unit
// of safety execution. it means there is no any operations
// on the safe unique index in the single segment.
var segments = matrix.split(logs)
// secondly. in each segment, we analyze the dependence between
// each oplogRecords. And
for _, segment := range segments {
toBeExecuted := matrix.convert(segment)
batchExecutor.executeInParallel(toBeExecuted)
}
}
// TODO
func (batchExecutor *BatchGroupExecutor) replicateShouldStall(n int64) {
}
func (batchExecutor *BatchGroupExecutor) executeInParallel(logs []*OplogRecord) {
// prepare execution monitor
latch := new(sync.WaitGroup)
latch.Add(len(logs))
// shard oplogRecords by _id primary key and make up callback chain
var buffer = make([][]*OplogRecord, len(batchExecutor.executors))
shardKey := oplog.PrimaryKeyHasher{}
var completionList []func()
for _, log := range logs {
selected := shardKey.DistributeOplogByMod(log.original.partialLog, len(batchExecutor.executors))
buffer[selected] = append(buffer[selected], log)
if log.original.callback != nil {
// should be ordered by the incoming sequence
completionList = append(completionList, log.original.callback)
}
}
for index, buf := range buffer {
if len(buf) != 0 {
nimo.AssertTrue(len(batchExecutor.executors[index].batchBlock) == 0, "executors buffer is not empty!")
nimo.AssertTrue(batchExecutor.executors[index].finisher == nil, "executors await status is wrong!")
batchExecutor.executors[index].finisher = latch
// follow the MEMORY MODEL : finisher should be assigned
// before batchBlock channel. it read after <- batchBlock
batchExecutor.executors[index].batchBlock <- buf
}
}
// wait for execute completely
latch.Wait()
// invoke all callbacks
for _, callback := range completionList {
callback()
}
// sweep executors' block buffer and await
for _, exec := range batchExecutor.executors {
exec.finisher = nil
}
}
type Executor struct {
// sequence index id in each replayer
id int
// batchExecutor, not owned
batchExecutor *BatchGroupExecutor
// records all oplogRecords into journal files
journal *utils.Journal
// mongo url
MongoUrl string
batchBlock chan []*OplogRecord
finisher *sync.WaitGroup
// mongo connection
conn *utils.MongoCommunityConn
// bulk insert or single insert
bulkInsert bool
// metric
metricInsert uint64
metricInsertMap sync.Map
metricUpdate uint64
metricUpdateMap sync.Map
metricDelete uint64
metricDeleteMap sync.Map
metricDDL uint64
metricDDLMap sync.Map
metricUnknown uint64
metricUnknownMap sync.Map
metricNoop uint64
metricNoopMap sync.Map
metricError uint64
metricErrorMap sync.Map
}
func GenerateExecutorId() int {
return int(atomic.AddInt32(&GlobalExecutorId, 1))
}
func NewExecutor(id int, batchExecutor *BatchGroupExecutor, MongoUrl string) *Executor {
return &Executor{
id: id,
batchExecutor: batchExecutor,
journal: utils.NewJournal(utils.JournalFileName(fmt.Sprintf("direct.%03d", id))),
MongoUrl: MongoUrl,
batchBlock: make(chan []*OplogRecord, 1),
}
}
func (exec *Executor) start() {
for toBeExecuted := range exec.batchBlock {
nimo.AssertTrue(len(toBeExecuted) != 0, "the size of being executed batch oplogRecords could not be zero")
for exec.doSync(toBeExecuted) != nil {
time.Sleep(time.Second)
}
// acknowledge all oplogRecords have been successfully executed
exec.finisher.Add(-len(toBeExecuted))
// records all oplogRecords if enabled. After write successfully
for _, log := range toBeExecuted {
exec.journal.WriteRecord(log.original.partialLog)
}
}
}
func (exec *Executor) doSync(logs []*OplogRecord) error {
count := len(logs)
transLogs := transformLogs(logs, exec.batchExecutor.NsTrans, conf.Options.IncrSyncDBRef)
// eventual consistency
// only sort CURD now for effective bulk insert
if len(logs) > 0 && logs[0].original.partialLog.Operation != "c" {
sort.SliceStable(transLogs, func(i, j int) bool {
return transLogs[i].original.partialLog.Namespace < transLogs[j].original.partialLog.Namespace
})
}
// split batched oplogRecords into (ns, op) groups. individual group
// can be accomplished in single MongoDB request. groups
// in this executor will be sequential
oplogGroups := LogsGroupCombiner{maxGroupNr: OplogsMaxGroupNum,
maxGroupSize: OplogsMaxGroupSize}.mergeToGroups(transLogs)
for _, group := range oplogGroups {
if err := exec.execute(group); err != nil {
return err
}
}
LOG.Info("Replayer-%d Executor-%d doSync oplogRecords received[%d] merged[%d]. merge to %.2f%% chunks",
exec.batchExecutor.ReplayerId, exec.id, count, len(oplogGroups), float32(len(oplogGroups))*100.00/float32(count))
return nil
}
// if no need to transform namespace, return original logs
// for no command log, transform namespace in DBRef by conf.Options.TransformDBRef
// for command log, need transform namespace/collection in object of oplog
func transformLogs(logs []*OplogRecord, nsTrans *transform.NamespaceTransform, transformRef bool) []*OplogRecord {
if nsTrans == nil {
return logs
}
for _, log := range logs {
partialLog := log.original.partialLog
transPartialLog := transformPartialLog(partialLog, nsTrans, transformRef)
if transPartialLog != nil {
log.original.partialLog = transPartialLog
}
}
return logs
}
func transformPartialLog(partialLog *oplog.PartialLog, nsTrans *transform.NamespaceTransform, transformRef bool) *oplog.PartialLog {
db := strings.SplitN(partialLog.Namespace, ".", 2)[0]
if partialLog.Operation != "c" {
// {"op" : "i", "ns" : "my.system.indexes", "o" : { "v" : 2, "key" : { "date" : 1 }, "name" : "date_1", "ns" : "my.tbl", "expireAfterSeconds" : 3600 }
if strings.HasSuffix(partialLog.Namespace, "system.indexes") {
value := oplog.GetKey(partialLog.Object, "ns")
oplog.SetFiled(partialLog.Object, "ns", nsTrans.Transform(value.(string)))
}
partialLog.Namespace = nsTrans.Transform(partialLog.Namespace)
if transformRef {
partialLog.Object = transform.TransformDBRef(partialLog.Object, db, nsTrans)
}
} else {
operation, found := oplog.ExtraCommandName(partialLog.Object)
if !found {
LOG.Warn("extraCommandName meets type[%s] which is not implemented, ignore!", operation)
return nil
}
switch operation {
case "create":
// { "create" : "my", "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "my.my" }
if idIndex := oplog.GetKey(partialLog.Object, "idIndex"); idIndex != nil {
if ns := oplog.GetKey(idIndex.(bson.D), "ns"); ns != nil {
oplog.SetFiled(idIndex.(bson.D), "ns", nsTrans.Transform(ns.(string)))
}
} else {
LOG.Warn("transformLogs meet unknown create command: %v", partialLog.Object)
}
fallthrough
case "createIndexes":
fallthrough
case "commitIndexBuild":
fallthrough
case "collMod":
fallthrough
case "drop":
fallthrough
case "deleteIndex":
fallthrough
case "deleteIndexes":
fallthrough
case "dropIndex":
fallthrough
case "dropIndexes":
fallthrough
case "convertToCapped":
fallthrough
case "emptycapped":
col, ok := oplog.GetKey(partialLog.Object, operation).(string)
if !ok {
LOG.Warn("extraCommandName meets illegal %v oplog %v, ignore!", operation, partialLog.Object)
return nil
}
partialLog.Namespace = nsTrans.Transform(fmt.Sprintf("%s.%s", db, col))
oplog.SetFiled(partialLog.Object, operation, strings.SplitN(partialLog.Namespace, ".", 2)[1])
case "renameCollection":
// { "renameCollection" : "my.tbl", "to" : "my.my", "stayTemp" : false, "dropTarget" : false }
fromNs, ok := oplog.GetKey(partialLog.Object, operation).(string)
if !ok {
LOG.Warn("extraCommandName meets illegal %v oplog %v, ignore!", operation, partialLog.Object)
return nil
}
toNs, ok := oplog.GetKey(partialLog.Object, "to").(string)
if !ok {
LOG.Warn("extraCommandName meets illegal %v oplog %v, ignore!", operation, partialLog.Object)
return nil
}
partialLog.Namespace = nsTrans.Transform(fromNs)
oplog.SetFiled(partialLog.Object, operation, partialLog.Namespace)
oplog.SetFiled(partialLog.Object, "to", nsTrans.Transform(toNs))
case "applyOps":
if ops := oplog.GetKey(partialLog.Object, "applyOps").([]bson.D); ops != nil {
// except field 'o'
except := map[string]struct{}{
"o": {},
}
for i, ele := range ops {
// m, keys := oplog.ConvertBsonD2M(ele)
m, keys := oplog.ConvertBsonD2MExcept(ele, except)
subLog := oplog.NewPartialLog(m)
transSubLog := transformPartialLog(subLog, nsTrans, transformRef)
if transSubLog == nil {
LOG.Warn("transformPartialLog sublog %v return nil, ignore!", subLog)
return nil
}
ops[i] = transSubLog.Dump(keys, false)
}
}
default:
// such as: dropDatabase
partialLog.Namespace = nsTrans.Transform(partialLog.Namespace)
}
}
return partialLog
}
type Item struct {
Key string
Val uint64
}
func (exec *Executor) RestAPI() {
type ExecutorInfo struct {
Id int `json:"id"`
Insert uint64 `json:"insert"`
Update uint64 `json:"update"`
Delete uint64 `json:"delete"`
DDL uint64 `json:"ddl"`
Unknown uint64 `json:"unknown"`
Error uint64 `json:"error"`
// Noop uint64 `json:"noop"`
InsertMap []Item `json:"insert_ns_top_3"`
UpdateMap []Item `json:"update_ns_top_3"`
DeleteMap []Item `json:"delete_ns_top_3"`
DDLMap []Item `json:"ddl_ns_top_3"`
UnknownMap []Item `json:"unknown_ns_top_3"`
ErrorMap []Item `json:"error_ns_top_3"`
}
utils.IncrSyncHttpApi.RegisterAPI("/executor", nimo.HttpGet, func([]byte) interface{} {
return &ExecutorInfo{
Id: exec.id,
Insert: exec.metricInsert,
Update: exec.metricUpdate,
Delete: exec.metricDelete,
DDL: exec.metricDDL,
Unknown: exec.metricUnknown,
Error: exec.metricError,
InsertMap: calculateTop3(exec.metricInsertMap),
UpdateMap: calculateTop3(exec.metricUpdateMap),
DeleteMap: calculateTop3(exec.metricDeleteMap),
DDLMap: calculateTop3(exec.metricDDLMap),
UnknownMap: calculateTop3(exec.metricUnknownMap),
ErrorMap: calculateTop3(exec.metricErrorMap),
}
})
}
func calculateTop3(inputMap sync.Map) []Item {
/*
* TODO, modify to priority queue.
* the contain/heap package is too complicated to used in golang than C++.
*/
var max1, max2, max3 *Item
inputMap.Range(func(key, val interface{}) bool {
k := key.(string)
v := val.(*uint64)
if max3 != nil && max3.Val > *v {
return true
}
if max2 != nil && max2.Val > *v {
if max3 == nil {
max3 = new(Item)
}
max3.Key = k
max3.Val = *v
return true
}
if max1 != nil && max1.Val > *v {
max3 = max2
max2 = &Item{
Key: k,
Val: *v,
}
return true
}
max3 = max2
max2 = max1
max1 = &Item{
Key: k,
Val: *v,
}
return true
})
ret := make([]Item, 0, 3)
if max1 != nil {
ret = append(ret, *max1)
}
if max2 != nil {
ret = append(ret, *max2)
}
if max3 != nil {
ret = append(ret, *max3)
}
return ret
}