in collector/batcher.go [280:397]
func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool) {
// picked raw oplogs and batching in sequence
batcher.batchGroup = make([][]*oplog.GenericOplog, len(batcher.workerGroup))
if batcher.barrierOplogs == nil {
batcher.barrierOplogs = make([]*oplog.GenericOplog, 0)
}
// Have barrier Oplogs to performed
if len(batcher.barrierOplogs) > 0 {
for _, v := range batcher.barrierOplogs {
if batcher.filter(v.Parsed) {
batcher.lastFilterOplog = v.Parsed
continue
}
if ddlFilter.Filter(v.Parsed) && !conf.Options.FilterDDLEnable {
batcher.lastFilterOplog = v.Parsed
continue
}
batcher.addIntoBatchGroup(v, true)
//LOG.Info("%s transfer barrierOplogs into batchGroup, i[%d], oplog[%v]", batcher.syncer, i, v.Parsed)
}
batcher.barrierOplogs = nil
return batcher.batchGroup, true, batcher.setLastOplog(), false
}
// try to get batch
mergeBatch, exit := batcher.getBatchWithDelay()
if mergeBatch == nil {
return batcher.batchGroup, false, batcher.setLastOplog(), exit
}
for i, genericLog := range mergeBatch {
//LOG.Info("~~~~~~~~~enter_input %v %v\n", i, genericLog.Parsed)
// filter oplog such like Noop or Gid-filtered
// PAY ATTENTION: we can't handle the oplog in transaction that has been filtered
if batcher.filter(genericLog.Parsed) {
// don't push to worker, set lastFilterOplog
batcher.lastFilterOplog = genericLog.Parsed
//LOG.Info("~~~~~~~~~filter %v %v", i, genericLog.Parsed)
continue
}
// Transactoin
if txnMeta, txnOk := batcher.isTransaction(genericLog.Parsed); txnOk {
//LOG.Info("~~~~~~~~~transaction %v %v", i, genericLog.Parsed)
isRet, mustIndividual, _, deliveredOps := batcher.handleTransaction(txnMeta, genericLog)
if !isRet {
continue
}
if mustIndividual {
batcher.barrierOplogs = deliveredOps
batcher.remainLogs = mergeBatch[i+1:]
allEmpty := batcher.setLastOplog()
nimo.AssertTrue(allEmpty == true, "batcher.batchGroup don't be empty")
return batcher.batchGroup, true, allEmpty, false
} else {
// TODO need do filter
for _, ele := range deliveredOps {
batcher.addIntoBatchGroup(ele, false)
}
continue
}
}
// no transaction applyOps
if genericLog.Parsed.Operation == "c" {
operation, _ := oplog.ExtraCommandName(genericLog.Parsed.Object)
if operation == "applyOps" {
deliveredOps, err := oplog.ExtractInnerOps(&genericLog.Parsed.ParsedLog)
if err != nil {
LOG.Crashf("applyOps extract failed. err[%v] oplog[%v]",
err, genericLog.Parsed.ParsedLog)
}
// TODO need do filter
for _, ele := range deliveredOps {
batcher.addIntoBatchGroup(&oplog.GenericOplog{
Raw: nil,
Parsed: &oplog.PartialLog{
ParsedLog: ele,
},
}, false)
}
continue
}
}
// current is ddl
if ddlFilter.Filter(genericLog.Parsed) {
if conf.Options.FilterDDLEnable {
batcher.barrierOplogs = append(batcher.barrierOplogs, genericLog)
batcher.remainLogs = mergeBatch[i+1:]
return batcher.batchGroup, true, batcher.setLastOplog(), false
} else {
// filter
batcher.syncer.replMetric.AddFilter(1)
// doesn't push to worker, set lastFilterOplog
batcher.lastFilterOplog = genericLog.Parsed
continue
}
}
batcher.addIntoBatchGroup(genericLog, false)
}
return batcher.batchGroup, false, batcher.setLastOplog(), exit
}