executor/combiner.go (67 lines of code) (raw):
package executor
import (
LOG "github.com/vinllen/log4go"
)
type OplogsGroup struct {
ns string
op string
gid string
oplogRecords []*OplogRecord
completionList []func()
}
func (group *OplogsGroup) completion() {
for _, cb := range group.completionList {
cb()
}
}
type LogsGroupCombiner struct {
maxGroupNr int
maxGroupSize int
}
func (combiner LogsGroupCombiner) mergeToGroups(logs []*OplogRecord) (groups []*OplogsGroup) {
forceSplit := false
sizeInGroup := 0
for _, log := range logs {
op := log.original.partialLog.Operation
ns := log.original.partialLog.Namespace
gid := log.original.partialLog.Gid
// the equivalent oplog.op and oplog.ns can be merged together
last := len(groups) - 1
if !forceSplit && // force split by log's wait function
len(groups) > 0 && // have one group existing at least
len(groups[last].oplogRecords) < combiner.maxGroupNr && // no more than max group number
sizeInGroup+log.original.partialLog.RawSize < combiner.maxGroupSize && // no more than one group size
groups[last].op == op && groups[last].ns == ns && groups[last].gid == gid { // same op and ns and gid
// we can merge this oplog into the latest batched oplogRecords group
combiner.merge(groups[len(groups)-1], log)
sizeInGroup += log.original.partialLog.RawSize // add size
} else {
if sizeInGroup != 0 {
LOG.Debug("mergeToGroups merge log with total size[%v]", sizeInGroup)
}
// new start of a group
groups = append(groups, combiner.startNewGroup(log))
sizeInGroup = log.original.partialLog.RawSize
}
// can't merge more oplogRecords further. this log should be the end in this group
forceSplit = log.wait != nil
}
LOG.Debug("mergeToGroups merge group with total number[%v]", len(groups))
return
}
func (combiner *LogsGroupCombiner) merge(group *OplogsGroup, log *OplogRecord) {
group.oplogRecords = append(group.oplogRecords, log)
if log.original.callback != nil {
group.completionList = append(group.completionList, log.original.callback)
}
}
func (combiner *LogsGroupCombiner) startNewGroup(log *OplogRecord) *OplogsGroup {
group := &OplogsGroup{
op: log.original.partialLog.Operation,
ns: log.original.partialLog.Namespace,
gid: log.original.partialLog.Gid,
oplogRecords: []*OplogRecord{log},
}
if log.original.callback == nil {
group.completionList = []func(){}
} else {
group.completionList = []func(){log.original.callback}
}
return group
}