in streamer/snapshot.go [151:193]
func (s *Streamer) streamLoop(snReader snapshot.Reader, outProducer pipe.Producer, iopsThrottler, mbThrottler *throttle.Throttle, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) bool {
var err error
var next bool
var nBytes, nEvents int64
var key, pKey string
var outMsg []byte
for !shutdown.Initiated() {
next, nBytes, nEvents, key, pKey, outMsg, err = s.streamBatch(snReader, outProducer, key, pKey, outMsg, ticker, snapshotMetrics)
if err != nil {
return false
}
if !next {
break
}
if !s.commitWithRetry(outProducer, snapshotMetrics) {
return false
}
c := iopsThrottler.Advice(nEvents)
m := mbThrottler.Advice(nBytes)
if m > c {
c = m
}
if c != 0 {
time.Sleep(time.Microsecond * time.Duration(c))
snapshotMetrics.ThrottledUs.Inc(c)
}
select {
case <-ticker.C:
if err := s.snapshotTickHandler(snReader); err != nil {
s.log.Warnf("Snapshot cancelled: %v", err)
return false
}
default:
}
}
return true
}