func()

in streamer/snapshot.go [151:193]


func (s *Streamer) streamLoop(snReader snapshot.Reader, outProducer pipe.Producer, iopsThrottler, mbThrottler *throttle.Throttle, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) bool {
	var err error
	var next bool
	var nBytes, nEvents int64
	var key, pKey string
	var outMsg []byte

	for !shutdown.Initiated() {
		next, nBytes, nEvents, key, pKey, outMsg, err = s.streamBatch(snReader, outProducer, key, pKey, outMsg, ticker, snapshotMetrics)
		if err != nil {
			return false
		}
		if !next {
			break
		}

		if !s.commitWithRetry(outProducer, snapshotMetrics) {
			return false
		}

		c := iopsThrottler.Advice(nEvents)
		m := mbThrottler.Advice(nBytes)
		if m > c {
			c = m
		}

		if c != 0 {
			time.Sleep(time.Microsecond * time.Duration(c))
			snapshotMetrics.ThrottledUs.Inc(c)
		}

		select {
		case <-ticker.C:
			if err := s.snapshotTickHandler(snReader); err != nil {
				s.log.Warnf("Snapshot cancelled: %v", err)
				return false
			}
		default:
		}
	}

	return true
}