in collector/checkpoint.go [149:199]
func (sync *OplogSyncer) calculateWorkerLowestCheckpoint() (v int64, err error) {
// no need to lock and eventually consistence is acceptable
allAcked := true
candidates := make([]int64, 0, len(sync.batcher.workerGroup))
allAckValues := make([]int64, 0, len(sync.batcher.workerGroup))
for _, worker := range sync.batcher.workerGroup {
// read ack value first because of we don't wanna
// a result of ack > unack. There wouldn't be cpu
// reorder under atomic !
ack := atomic.LoadInt64(&worker.ack)
unack := atomic.LoadInt64(&worker.unack)
if ack == 0 && unack == 0 {
// have no oplogs synced in this worker. skip
} else if ack == unack || worker.IsAllAcked() {
// all oplogs have been acked for right now or previous status
worker.AllAcked(true)
allAckValues = append(allAckValues, ack)
} else if unack > ack {
// most likely. partial oplogs acked (0 is possible)
candidates = append(candidates, ack)
allAcked = false
} else if unack < ack && unack == 0 {
// collector restarts. receiver unack value if from buffer
// this is rarely happened. However we have delayed for
// a bit log time. so we could use it
allAcked = false
} else if unack < ack && unack != 0 {
// we should wait the bigger unack follows up the ack
// they (unack and ack) will be equivalent soon !
return 0, fmt.Errorf("candidates should follow up unack[%d] ack[%d]", unack, ack)
}
}
if allAcked && len(allAckValues) != 0 {
// free to choose the maximum value. ascend order
// the last one is the biggest
sort.Sort(utils.Int64Slice(allAckValues))
return allAckValues[len(allAckValues)-1], nil
}
if len(candidates) == 0 {
return 0, errors.New("no candidates ack values found")
}
// ascend order. first is the smallest
sort.Sort(utils.Int64Slice(candidates))
if candidates[0] == 0 {
return 0, errors.New("smallest candidates is zero")
}
LOG.Info("worker offset %v use lowest %v", candidates, utils.ExtractTimestampForLog(candidates[0]))
return candidates[0], nil
}