func()

in collector/batcher.go [450:527]


func (batcher *Batcher) handleTransaction(txnMeta oplog.TxnMeta,
	genericLog *oplog.GenericOplog) (isRet bool, mustIndividual bool, mustSerial bool,
	deliveredOps []*oplog.GenericOplog) {
	err := batcher.txnBuffer.AddOp(txnMeta, genericLog.Parsed.ParsedLog)
	if err != nil {
		LOG.Crashf("%s add oplog to txnbuffer failed, err[%v] oplog[%v]",
			batcher.syncer, err, genericLog.Parsed.ParsedLog)
	}

	// distributed transaction is abort, ignore these Oplogs and clear buffer
	if txnMeta.IsAbort() {
		err := batcher.txnBuffer.PurgeTxn(txnMeta)
		if err != nil {
			LOG.Crashf("%s cleaning up txnBuffer failed, err[%v] oplog[%v]",
				batcher.syncer, err, genericLog.Parsed.ParsedLog)
		}

		batcher.syncer.replMetric.AddFilter(1)
		batcher.lastFilterOplog = genericLog.Parsed
		return false, false, false, nil
	}

	if !txnMeta.IsCommit() {
		// transaction can not be commit
		return false, false, false, nil
	}

	haveCommandInTransaction := false
	mustIndividual = true
	mustSerial = false
	// transaction can be commit now
	ops, errs := batcher.txnBuffer.GetTxnStream(txnMeta)
Loop:
	for {
		select {
		case o, ok := <-ops:
			if !ok {
				break Loop
			}
			newOplog := &oplog.PartialLog{
				ParsedLog: o,
			}

			if newOplog.Operation == "c" {
				haveCommandInTransaction = true
			}

			// Raw will be filling in Send->LogEntryEncode
			deliveredOps = append(deliveredOps,
				&oplog.GenericOplog{
					Raw:    nil,
					Parsed: newOplog,
				})
		case err := <-errs:
			if err != nil {
				LOG.Crashf("error replaying transaction, err[%v]", err)
			}
			break Loop
		}
	}

	// Individual transaction that do not have commamnd can run run with other curd oplog
	if !txnMeta.IsCommitOp() && !haveCommandInTransaction &&
		genericLog.Parsed.PrevOpTime.String() == emptyPrevRaw.String() {
		mustIndividual = false
	}
	// transaction applyOps that do not have command can run parallelly
	if haveCommandInTransaction {
		mustSerial = true
	}

	err = batcher.txnBuffer.PurgeTxn(txnMeta)
	if err != nil {
		LOG.Crashf("error cleaning up transaction buffer, err[%v]", err)
	}

	return true, mustIndividual, mustSerial, deliveredOps
}