collector/batcher.go (370 lines of code) (raw):
package collector
import (
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"github.com/alibaba/MongoShake/v2/collector/filter"
"github.com/alibaba/MongoShake/v2/oplog"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
utils "github.com/alibaba/MongoShake/v2/common"
nimo "github.com/gugemichael/nimo4go"
LOG "github.com/vinllen/log4go"
)
const (
noopInterval = 10 // s
)
var (
moveChunkFilter filter.MigrateFilter
ddlFilter filter.DDLFilter
fakeOplog = &oplog.GenericOplog{
Raw: nil,
Parsed: &oplog.PartialLog{ // initial fake oplog only used in comparison
ParsedLog: oplog.ParsedLog{
// fake timestamp that doesn't appear in reality, must be the smallest ts for compare in SetLastLog
Timestamp: primitive.Timestamp{T: 0, I: 0},
Operation: "meaningless operation",
},
},
}
emptyPrevRaw = bson.Raw{
28, 0, 0, 0, 17, 116, 115, 0, 0, 0, 0, 0, 0, 0, 0, 0,
18, 116, 0, 255, 255, 255, 255, 255, 255, 255, 255, 0,
}
)
func getTargetDelay() int64 {
if utils.IncrSentinelOptions.TargetDelay < 0 {
return conf.Options.IncrSyncTargetDelay
} else {
return utils.IncrSentinelOptions.TargetDelay
}
}
func getExitPoint() primitive.Timestamp {
if utils.IncrSentinelOptions.ExitPoint <= 0 {
return primitive.Timestamp{}
}
// change to timestamp
return utils.Int64ToTimestamp(utils.IncrSentinelOptions.ExitPoint)
}
/*
* as we mentioned in syncer.go, Batcher is used to batch oplog before sending in order to
* improve performance.
*/
type Batcher struct {
// related oplog syncer. not owned
syncer *OplogSyncer
// filter functionality by gid
filterList filter.OplogFilterChain
// oplog handler
handler OplogHandler
// current queue cursor
nextQueue uint64
// related tunnel workerGroup. not owned
workerGroup []*Worker
// the last oplog in the batch
lastOplog *oplog.GenericOplog
// the last filtered oplog in the batch
lastFilterOplog *oplog.PartialLog
// remainLogs store the logs that split by barrier and haven't been consumed yet.
remainLogs []*oplog.GenericOplog
// batchMore inner usage
batchGroup [][]*oplog.GenericOplog
// Oplogs that need to be performed separately, like DDL(only one oplogs)ćTransaction(some oplogs)
barrierOplogs []*oplog.GenericOplog
// transaction buffer
txnBuffer *oplog.TxnBuffer
// for ut only
utBatchesDelay struct {
flag bool // ut enable?
injectBatch []*oplog.GenericOplog // input batched oplog
delay int // the delay times
}
}
func NewBatcher(syncer *OplogSyncer, filterList filter.OplogFilterChain,
handler OplogHandler, workerGroup []*Worker) *Batcher {
return &Batcher{
syncer: syncer,
filterList: filterList,
handler: handler,
workerGroup: workerGroup,
lastOplog: fakeOplog,
lastFilterOplog: fakeOplog.Parsed,
txnBuffer: oplog.NewBuffer(),
}
}
func (batcher *Batcher) Fini() {
batcher.txnBuffer.Stop()
}
/*
* return the last oplog, if the current batch is empty(first oplog in this batch is ddl),
* just return the last oplog in the previous batch.
* if just start, this is nil.
*/
func (batcher *Batcher) getLastOplog() (*oplog.PartialLog, *oplog.PartialLog) {
return batcher.lastOplog.Parsed, batcher.lastFilterOplog
}
func (batcher *Batcher) filter(log *oplog.PartialLog) bool {
// filter oplog such like Noop or Gid-filtered
if batcher.filterList.IterateFilter(log) {
LOG.Debug("%s oplog is filtered. %v", batcher.syncer, log)
batcher.syncer.replMetric.AddFilter(1)
return true
}
if moveChunkFilter.Filter(log) {
LOG.Critical("shake exit, must close balancer in sharding + oplog")
LOG.Crashf("move chunk oplog found, must close balancer in sharding + oplog [%v]", log)
return false
}
// DDL is disable when timestamp <= fullSyncFinishPosition
// v2.4.10: do not crash when "fetch_method" == "change_stream"
if ddlFilter.Filter(log) &&
primitive.CompareTimestamp(log.Timestamp, batcher.syncer.fullSyncFinishPosition) <= 0 &&
conf.Options.IncrSyncMongoFetchMethod == utils.VarIncrSyncMongoFetchMethodOplog {
LOG.Crashf("%s ddl oplog found[%v] when oplog timestamp[%v] less than fullSyncFinishPosition[%v]",
batcher.syncer, log, log.Timestamp,
batcher.syncer.fullSyncFinishPosition)
return false
}
return false
}
func (batcher *Batcher) dispatchBatches(batchGroup [][]*oplog.GenericOplog) (work bool) {
for i, batch := range batchGroup {
// we still push logs even if length is zero. so without length check
if batch != nil {
work = true
batcher.workerGroup[i].AllAcked(false)
}
batcher.workerGroup[i].Offer(batch)
}
return
}
// get a batch
func (batcher *Batcher) getBatch() []*oplog.GenericOplog {
syncer := batcher.syncer
var mergeBatch []*oplog.GenericOplog
if len(batcher.remainLogs) == 0 {
// remainLogs is empty.
select {
case mergeBatch = <-syncer.logsQueue[batcher.currentQueue()]:
break
case <-time.After(noopInterval * time.Second):
// return nil if timeout
return nil
}
// move to next available logs queue
batcher.moveToNextQueue()
for len(mergeBatch) < conf.Options.IncrSyncAdaptiveBatchingMaxSize &&
len(syncer.logsQueue[batcher.currentQueue()]) > 0 {
// there has more pushed oplogs in next logs queue (read can't to be block)
// Hence, we fetch them by the way. and merge together
mergeBatch = append(mergeBatch, <-syncer.logsQueue[batcher.nextQueue]...)
batcher.moveToNextQueue()
}
} else {
// remainLogs isn't empty
mergeBatch = batcher.remainLogs
// we can't use "batcher.remainLogs = batcher.remainLogs[:0]" here
batcher.remainLogs = make([]*oplog.GenericOplog, 0)
}
nimo.AssertTrue(len(mergeBatch) != 0, "logs queue batch logs has zero length")
return mergeBatch
}
/*
* if delay > 0, this function wait till delay timeout.
* However, if the mergeBatch contain may oplogs, the delay time will depends on the first
* oplog timestamp. So, if the time span of the included batched oplog is too large, the
* delay time is inaccurate.
* the second return value marks whether should exit.
*/
func (batcher *Batcher) getBatchWithDelay() ([]*oplog.GenericOplog, bool) {
var mergeBatch []*oplog.GenericOplog
if !batcher.utBatchesDelay.flag {
mergeBatch = batcher.getBatch()
} else { // for ut only
mergeBatch = batcher.utBatchesDelay.injectBatch
}
if mergeBatch == nil {
return mergeBatch, false
}
// judge should exit
exitPoint := getExitPoint()
lastOplog := mergeBatch[len(mergeBatch)-1].Parsed
if !exitPoint.IsZero() &&
primitive.CompareTimestamp(lastOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 &&
primitive.CompareTimestamp(exitPoint, lastOplog.Timestamp) < 0 {
// only run detail judgement when exit point is bigger than the last one
LOG.Info("%s exitPoint[%v] < lastOplog.Timestamp[%v]", batcher.syncer, exitPoint, lastOplog.Timestamp)
var i int
for i = range mergeBatch {
// fmt.Println(exitPoint, mergeBatch[i].Parsed.Timestamp)
if primitive.CompareTimestamp(exitPoint, mergeBatch[i].Parsed.Timestamp) < 0 {
LOG.Info("%s exitPoint[%v] < current.Timestamp[%v]", batcher.syncer,
exitPoint, mergeBatch[i].Parsed.Timestamp)
break
}
}
return mergeBatch[:i], true
}
// judge whether should delay
delay := getTargetDelay()
if delay > 0 {
firstOplog := mergeBatch[0].Parsed
// do not wait delay when oplog time less than fullSyncFinishPosition
if primitive.CompareTimestamp(firstOplog.Timestamp, batcher.syncer.fullSyncFinishPosition) > 0 {
for {
// only run sleep if delay > 0
// re-fetch delay in every round
delay = getTargetDelay()
delayBoundary := time.Now().Unix() - delay + 3 // 3 is for NTP drift
if utils.ExtractMongoTimestamp(firstOplog.Timestamp) > delayBoundary {
LOG.Info("%s --- wait target delay[%v seconds]: "+
"first oplog timestamp[%v] > delayBoundary[%v], fullSyncFinishPosition[%v]",
batcher.syncer, delay, firstOplog.Timestamp, delayBoundary,
batcher.syncer.fullSyncFinishPosition)
time.Sleep(5 * time.Second)
// for ut only
batcher.utBatchesDelay.delay++
} else {
break
}
}
}
}
return mergeBatch, false
}
// BatchMore
/**
* this function is used to gather oplogs together.
* honestly speaking, it's complicate so that reading unit tests may help you
* to make it more clear. The reason this function is so complicate is there're
* too much corner cases here.
* return batched oplogs and barrier flag.
* set barrier if find DDL.
* i d i c u i
* | |
*/
func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool) {
// picked raw oplogs and batching in sequence
batcher.batchGroup = make([][]*oplog.GenericOplog, len(batcher.workerGroup))
if batcher.barrierOplogs == nil {
batcher.barrierOplogs = make([]*oplog.GenericOplog, 0)
}
// Have barrier Oplogs to performed
if len(batcher.barrierOplogs) > 0 {
for _, v := range batcher.barrierOplogs {
if batcher.filter(v.Parsed) {
batcher.lastFilterOplog = v.Parsed
continue
}
if ddlFilter.Filter(v.Parsed) && !conf.Options.FilterDDLEnable {
batcher.lastFilterOplog = v.Parsed
continue
}
batcher.addIntoBatchGroup(v, true)
//LOG.Info("%s transfer barrierOplogs into batchGroup, i[%d], oplog[%v]", batcher.syncer, i, v.Parsed)
}
batcher.barrierOplogs = nil
return batcher.batchGroup, true, batcher.setLastOplog(), false
}
// try to get batch
mergeBatch, exit := batcher.getBatchWithDelay()
if mergeBatch == nil {
return batcher.batchGroup, false, batcher.setLastOplog(), exit
}
for i, genericLog := range mergeBatch {
//LOG.Info("~~~~~~~~~enter_input %v %v\n", i, genericLog.Parsed)
// filter oplog such like Noop or Gid-filtered
// PAY ATTENTION: we can't handle the oplog in transaction that has been filtered
if batcher.filter(genericLog.Parsed) {
// don't push to worker, set lastFilterOplog
batcher.lastFilterOplog = genericLog.Parsed
//LOG.Info("~~~~~~~~~filter %v %v", i, genericLog.Parsed)
continue
}
// Transactoin
if txnMeta, txnOk := batcher.isTransaction(genericLog.Parsed); txnOk {
//LOG.Info("~~~~~~~~~transaction %v %v", i, genericLog.Parsed)
isRet, mustIndividual, _, deliveredOps := batcher.handleTransaction(txnMeta, genericLog)
if !isRet {
continue
}
if mustIndividual {
batcher.barrierOplogs = deliveredOps
batcher.remainLogs = mergeBatch[i+1:]
allEmpty := batcher.setLastOplog()
nimo.AssertTrue(allEmpty == true, "batcher.batchGroup don't be empty")
return batcher.batchGroup, true, allEmpty, false
} else {
// TODO need do filter
for _, ele := range deliveredOps {
batcher.addIntoBatchGroup(ele, false)
}
continue
}
}
// no transaction applyOps
if genericLog.Parsed.Operation == "c" {
operation, _ := oplog.ExtraCommandName(genericLog.Parsed.Object)
if operation == "applyOps" {
deliveredOps, err := oplog.ExtractInnerOps(&genericLog.Parsed.ParsedLog)
if err != nil {
LOG.Crashf("applyOps extract failed. err[%v] oplog[%v]",
err, genericLog.Parsed.ParsedLog)
}
// TODO need do filter
for _, ele := range deliveredOps {
batcher.addIntoBatchGroup(&oplog.GenericOplog{
Raw: nil,
Parsed: &oplog.PartialLog{
ParsedLog: ele,
},
}, false)
}
continue
}
}
// current is ddl
if ddlFilter.Filter(genericLog.Parsed) {
if conf.Options.FilterDDLEnable {
batcher.barrierOplogs = append(batcher.barrierOplogs, genericLog)
batcher.remainLogs = mergeBatch[i+1:]
return batcher.batchGroup, true, batcher.setLastOplog(), false
} else {
// filter
batcher.syncer.replMetric.AddFilter(1)
// doesn't push to worker, set lastFilterOplog
batcher.lastFilterOplog = genericLog.Parsed
continue
}
}
batcher.addIntoBatchGroup(genericLog, false)
}
return batcher.batchGroup, false, batcher.setLastOplog(), exit
}
func (batcher *Batcher) setLastOplog() bool {
// all oplogs are filtered?
allEmpty := true
for _, ele := range batcher.batchGroup {
if ele != nil && len(ele) > 0 {
allEmpty = false
rawLast := ele[len(ele)-1]
if primitive.CompareTimestamp(rawLast.Parsed.Timestamp, batcher.lastOplog.Parsed.Timestamp) > 0 {
batcher.lastOplog = rawLast
}
}
}
return allEmpty
}
// addIntoBatchGroup
// isBarrier
// Barrier Oplogs(like DDL or Transaction) must execute sequentially and separately, send to batchGroup[0]
func (batcher *Batcher) addIntoBatchGroup(genericLog *oplog.GenericOplog, isBarrier bool) {
if genericLog == fakeOplog {
return
}
batcher.handler.Handle(genericLog.Parsed)
var which uint32
if isBarrier {
which = 0
} else {
which = batcher.syncer.hasher.DistributeOplogByMod(genericLog.Parsed, len(batcher.workerGroup))
}
batcher.batchGroup[which] = append(batcher.batchGroup[which], genericLog)
// LOG.Debug("add into worker[%v]: %v", which, genericLog.Parsed.ParsedLog)
}
func (batcher *Batcher) isTransaction(partialLog *oplog.PartialLog) (oplog.TxnMeta, bool) {
//LOG.Info("isTransaction input oplog:%v lsid[%v] TxnNumber[%v] Object[%v]", partialLog,
// partialLog.ParsedLog.LSID, partialLog.ParsedLog.TxnNumber, partialLog.ParsedLog.Object)
if partialLog.Operation == "c" {
txnMeta, err := oplog.NewTxnMeta(partialLog.ParsedLog)
if err != nil {
return oplog.TxnMeta{}, false
}
return txnMeta, txnMeta.IsTxn()
}
return oplog.TxnMeta{}, false
}
func (batcher *Batcher) handleTransaction(txnMeta oplog.TxnMeta,
genericLog *oplog.GenericOplog) (isRet bool, mustIndividual bool, mustSerial bool,
deliveredOps []*oplog.GenericOplog) {
err := batcher.txnBuffer.AddOp(txnMeta, genericLog.Parsed.ParsedLog)
if err != nil {
LOG.Crashf("%s add oplog to txnbuffer failed, err[%v] oplog[%v]",
batcher.syncer, err, genericLog.Parsed.ParsedLog)
}
// distributed transaction is abort, ignore these Oplogs and clear buffer
if txnMeta.IsAbort() {
err := batcher.txnBuffer.PurgeTxn(txnMeta)
if err != nil {
LOG.Crashf("%s cleaning up txnBuffer failed, err[%v] oplog[%v]",
batcher.syncer, err, genericLog.Parsed.ParsedLog)
}
batcher.syncer.replMetric.AddFilter(1)
batcher.lastFilterOplog = genericLog.Parsed
return false, false, false, nil
}
if !txnMeta.IsCommit() {
// transaction can not be commit
return false, false, false, nil
}
haveCommandInTransaction := false
mustIndividual = true
mustSerial = false
// transaction can be commit now
ops, errs := batcher.txnBuffer.GetTxnStream(txnMeta)
Loop:
for {
select {
case o, ok := <-ops:
if !ok {
break Loop
}
newOplog := &oplog.PartialLog{
ParsedLog: o,
}
if newOplog.Operation == "c" {
haveCommandInTransaction = true
}
// Raw will be filling in Send->LogEntryEncode
deliveredOps = append(deliveredOps,
&oplog.GenericOplog{
Raw: nil,
Parsed: newOplog,
})
case err := <-errs:
if err != nil {
LOG.Crashf("error replaying transaction, err[%v]", err)
}
break Loop
}
}
// Individual transaction that do not have commamnd can run run with other curd oplog
if !txnMeta.IsCommitOp() && !haveCommandInTransaction &&
genericLog.Parsed.PrevOpTime.String() == emptyPrevRaw.String() {
mustIndividual = false
}
// transaction applyOps that do not have command can run parallelly
if haveCommandInTransaction {
mustSerial = true
}
err = batcher.txnBuffer.PurgeTxn(txnMeta)
if err != nil {
LOG.Crashf("error cleaning up transaction buffer, err[%v]", err)
}
return true, mustIndividual, mustSerial, deliveredOps
}
func (batcher *Batcher) moveToNextQueue() {
batcher.nextQueue++
batcher.nextQueue = batcher.nextQueue % uint64(len(batcher.syncer.logsQueue))
}
func (batcher *Batcher) currentQueue() uint64 {
return batcher.nextQueue
}