func()

in collector/batcher.go [280:397]


func (batcher *Batcher) BatchMore() (genericOplogs [][]*oplog.GenericOplog, barrier bool, allEmpty bool, exit bool) {
	// picked raw oplogs and batching in sequence
	batcher.batchGroup = make([][]*oplog.GenericOplog, len(batcher.workerGroup))
	if batcher.barrierOplogs == nil {
		batcher.barrierOplogs = make([]*oplog.GenericOplog, 0)
	}

	// Have barrier Oplogs to performed
	if len(batcher.barrierOplogs) > 0 {
		for _, v := range batcher.barrierOplogs {
			if batcher.filter(v.Parsed) {
				batcher.lastFilterOplog = v.Parsed
				continue
			}
			if ddlFilter.Filter(v.Parsed) && !conf.Options.FilterDDLEnable {
				batcher.lastFilterOplog = v.Parsed
				continue
			}

			batcher.addIntoBatchGroup(v, true)
			//LOG.Info("%s transfer barrierOplogs into batchGroup, i[%d], oplog[%v]", batcher.syncer, i, v.Parsed)
		}
		batcher.barrierOplogs = nil

		return batcher.batchGroup, true, batcher.setLastOplog(), false
	}

	// try to get batch
	mergeBatch, exit := batcher.getBatchWithDelay()

	if mergeBatch == nil {
		return batcher.batchGroup, false, batcher.setLastOplog(), exit
	}

	for i, genericLog := range mergeBatch {
		//LOG.Info("~~~~~~~~~enter_input %v %v\n", i, genericLog.Parsed)

		// filter oplog such like Noop or Gid-filtered
		// PAY ATTENTION: we can't handle the oplog in transaction that has been filtered
		if batcher.filter(genericLog.Parsed) {
			// don't push to worker, set lastFilterOplog
			batcher.lastFilterOplog = genericLog.Parsed
			//LOG.Info("~~~~~~~~~filter %v %v", i, genericLog.Parsed)
			continue
		}

		// Transactoin
		if txnMeta, txnOk := batcher.isTransaction(genericLog.Parsed); txnOk {
			//LOG.Info("~~~~~~~~~transaction %v %v", i, genericLog.Parsed)
			isRet, mustIndividual, _, deliveredOps := batcher.handleTransaction(txnMeta, genericLog)
			if !isRet {
				continue
			}
			if mustIndividual {
				batcher.barrierOplogs = deliveredOps

				batcher.remainLogs = mergeBatch[i+1:]

				allEmpty := batcher.setLastOplog()
				nimo.AssertTrue(allEmpty == true, "batcher.batchGroup don't be empty")
				return batcher.batchGroup, true, allEmpty, false
			} else {

				// TODO need do filter
				for _, ele := range deliveredOps {
					batcher.addIntoBatchGroup(ele, false)
				}
				continue
			}
		}

		// no transaction applyOps
		if genericLog.Parsed.Operation == "c" {
			operation, _ := oplog.ExtraCommandName(genericLog.Parsed.Object)
			if operation == "applyOps" {
				deliveredOps, err := oplog.ExtractInnerOps(&genericLog.Parsed.ParsedLog)
				if err != nil {
					LOG.Crashf("applyOps extract failed. err[%v] oplog[%v]",
						err, genericLog.Parsed.ParsedLog)
				}

				// TODO need do filter
				for _, ele := range deliveredOps {
					batcher.addIntoBatchGroup(&oplog.GenericOplog{
						Raw: nil,
						Parsed: &oplog.PartialLog{
							ParsedLog: ele,
						},
					}, false)
				}
				continue
			}
		}

		// current is ddl
		if ddlFilter.Filter(genericLog.Parsed) {

			if conf.Options.FilterDDLEnable {
				batcher.barrierOplogs = append(batcher.barrierOplogs, genericLog)

				batcher.remainLogs = mergeBatch[i+1:]

				return batcher.batchGroup, true, batcher.setLastOplog(), false
			} else {
				// filter
				batcher.syncer.replMetric.AddFilter(1)
				// doesn't push to worker, set lastFilterOplog
				batcher.lastFilterOplog = genericLog.Parsed

				continue
			}
		}

		batcher.addIntoBatchGroup(genericLog, false)
	}

	return batcher.batchGroup, false, batcher.setLastOplog(), exit
}