in streamer/buffer.go [217:287]
func (s *Streamer) streamTable(consumer pipe.Consumer) bool {
var saveOffsets = true
outProducer, err := s.outPipe.NewProducer(s.topic)
if log.E(err) {
return false
}
outProducer.SetFormat(s.row.OutputFormat)
defer func() {
closeConsumer(consumer, saveOffsets, s.log)
log.EL(s.log, outProducer.Close())
}()
stateUpdateTick := time.NewTicker(s.stateUpdateInterval)
defer stateUpdateTick.Stop()
batchCommitTick := time.NewTicker(batchCommitInterval)
defer batchCommitTick.Stop()
s.log.Debugf("Beginning to stream from buffer")
var numEvents int64
var prevBytesWritten = s.BytesWritten
for !shutdown.Initiated() {
select {
case <-stateUpdateTick.C:
if !s.bufferTickHandler(consumer) {
return true
}
case msg := <-consumer.Message():
if msg == nil {
s.log.Debugf("End of message stream")
return true
}
if err := s.produceEvent(outProducer, msg); err != nil {
s.log.Errorf(errors.Wrap(err, "Failed to produce message").Error())
saveOffsets = false
return false
}
numEvents++
numBytes := s.BytesWritten - prevBytesWritten
// Commit the batch if we have reached batch size
if numEvents >= int64(s.outPipe.Config().MaxBatchSize) ||
numBytes >= int64(s.outPipe.Config().MaxBatchSizeBytes) {
if !s.commitBatch(outProducer, numEvents) {
saveOffsets = false
return false
}
numEvents = 0
prevBytesWritten = s.BytesWritten
}
case err := <-consumer.Error():
s.log.Errorf(errors.Wrap(err, "Failed to fetch next message").Error())
return false
case <-batchCommitTick.C:
// Its time to commit the batch, let's emit metrics as well
if !s.commitBatch(outProducer, numEvents) {
saveOffsets = false
return false
}
numEvents = 0
prevBytesWritten = s.BytesWritten
case <-shutdown.InitiatedCh():
}
}
return true
}