in collector/batcher.go [450:527]
func (batcher *Batcher) handleTransaction(txnMeta oplog.TxnMeta,
genericLog *oplog.GenericOplog) (isRet bool, mustIndividual bool, mustSerial bool,
deliveredOps []*oplog.GenericOplog) {
err := batcher.txnBuffer.AddOp(txnMeta, genericLog.Parsed.ParsedLog)
if err != nil {
LOG.Crashf("%s add oplog to txnbuffer failed, err[%v] oplog[%v]",
batcher.syncer, err, genericLog.Parsed.ParsedLog)
}
// distributed transaction is abort, ignore these Oplogs and clear buffer
if txnMeta.IsAbort() {
err := batcher.txnBuffer.PurgeTxn(txnMeta)
if err != nil {
LOG.Crashf("%s cleaning up txnBuffer failed, err[%v] oplog[%v]",
batcher.syncer, err, genericLog.Parsed.ParsedLog)
}
batcher.syncer.replMetric.AddFilter(1)
batcher.lastFilterOplog = genericLog.Parsed
return false, false, false, nil
}
if !txnMeta.IsCommit() {
// transaction can not be commit
return false, false, false, nil
}
haveCommandInTransaction := false
mustIndividual = true
mustSerial = false
// transaction can be commit now
ops, errs := batcher.txnBuffer.GetTxnStream(txnMeta)
Loop:
for {
select {
case o, ok := <-ops:
if !ok {
break Loop
}
newOplog := &oplog.PartialLog{
ParsedLog: o,
}
if newOplog.Operation == "c" {
haveCommandInTransaction = true
}
// Raw will be filling in Send->LogEntryEncode
deliveredOps = append(deliveredOps,
&oplog.GenericOplog{
Raw: nil,
Parsed: newOplog,
})
case err := <-errs:
if err != nil {
LOG.Crashf("error replaying transaction, err[%v]", err)
}
break Loop
}
}
// Individual transaction that do not have commamnd can run run with other curd oplog
if !txnMeta.IsCommitOp() && !haveCommandInTransaction &&
genericLog.Parsed.PrevOpTime.String() == emptyPrevRaw.String() {
mustIndividual = false
}
// transaction applyOps that do not have command can run parallelly
if haveCommandInTransaction {
mustSerial = true
}
err = batcher.txnBuffer.PurgeTxn(txnMeta)
if err != nil {
LOG.Crashf("error cleaning up transaction buffer, err[%v]", err)
}
return true, mustIndividual, mustSerial, deliveredOps
}