func()

in streamer/snapshot.go [44:103]


func (s *Streamer) streamBatch(snReader snapshot.Reader, outProducer pipe.Producer, key string, pKey string, outMsg []byte, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) (bool, int64, int64, string, string, []byte, error) {
	var i, b int
	var prevKey string
	var err error
	defer snapshotMetrics.ReadLatency.Start().Stop()
	cfg := s.outPipe.Config()
	for outMsg != nil || snReader.FetchNext() { //use last message from prev batch or fetch next one
		if outMsg == nil {
			key, pKey, outMsg, err = snReader.Pop()
			if log.EL(s.log, err) {
				return false, 0, 0, "", "", nil, err
			}
		}

		//Commit when batch full and partition key different from previous is fetched
		//This means that batch can be a little bigger than MaxBatchSize
		if (i >= cfg.MaxBatchSize || b >= cfg.MaxBatchSizeBytes) && pKey != prevKey {
			break
		}

		if len(outMsg) == 0 {
			outMsg = nil
		} else {
			b += len(outMsg)

			key = outProducer.PartitionKey("snapshot", key)
			err = outProducer.PushBatch(key, outMsg)

			if log.EL(s.log, err) {
				return false, 0, 0, "", "", nil, err
			}

			i++
			outMsg = nil
			prevKey = pKey
		}

		if ticker != nil {
			select {
			case <-ticker.C:
				if err := s.snapshotTickHandler(snReader); err != nil {
					s.log.Warnf("Snapshot cancelled: %v", err)
					return false, 0, 0, "", "", nil, err
				}
			default:
			}
		}
	}

	snapshotMetrics.BatchSize.Record(time.Duration(i) * time.Millisecond)

	if i == 0 {
		return false, 0, 0, "", "", nil, nil
	}

	snapshotMetrics.BytesWritten.Inc(int64(b))
	snapshotMetrics.EventsWritten.Inc(int64(i))

	return true, int64(b), int64(i), key, pKey, outMsg, nil
}