func()

in streamer/buffer.go [217:287]


func (s *Streamer) streamTable(consumer pipe.Consumer) bool {
	var saveOffsets = true

	outProducer, err := s.outPipe.NewProducer(s.topic)
	if log.E(err) {
		return false
	}
	outProducer.SetFormat(s.row.OutputFormat)

	defer func() {
		closeConsumer(consumer, saveOffsets, s.log)
		log.EL(s.log, outProducer.Close())
	}()

	stateUpdateTick := time.NewTicker(s.stateUpdateInterval)
	defer stateUpdateTick.Stop()

	batchCommitTick := time.NewTicker(batchCommitInterval)
	defer batchCommitTick.Stop()

	s.log.Debugf("Beginning to stream from buffer")

	var numEvents int64
	var prevBytesWritten = s.BytesWritten
	for !shutdown.Initiated() {
		select {
		case <-stateUpdateTick.C:
			if !s.bufferTickHandler(consumer) {
				return true
			}
		case msg := <-consumer.Message():
			if msg == nil {
				s.log.Debugf("End of message stream")
				return true
			}

			if err := s.produceEvent(outProducer, msg); err != nil {
				s.log.Errorf(errors.Wrap(err, "Failed to produce message").Error())
				saveOffsets = false
				return false
			}
			numEvents++
			numBytes := s.BytesWritten - prevBytesWritten

			// Commit the batch if we have reached batch size
			if numEvents >= int64(s.outPipe.Config().MaxBatchSize) ||
				numBytes >= int64(s.outPipe.Config().MaxBatchSizeBytes) {
				if !s.commitBatch(outProducer, numEvents) {
					saveOffsets = false
					return false
				}
				numEvents = 0
				prevBytesWritten = s.BytesWritten
			}
		case err := <-consumer.Error():
			s.log.Errorf(errors.Wrap(err, "Failed to fetch next message").Error())
			return false
		case <-batchCommitTick.C:
			// Its time to commit the batch, let's emit metrics as well
			if !s.commitBatch(outProducer, numEvents) {
				saveOffsets = false
				return false
			}
			numEvents = 0
			prevBytesWritten = s.BytesWritten
		case <-shutdown.InitiatedCh():
		}
	}

	return true
}