in streamer/snapshot.go [44:103]
func (s *Streamer) streamBatch(snReader snapshot.Reader, outProducer pipe.Producer, key string, pKey string, outMsg []byte, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) (bool, int64, int64, string, string, []byte, error) {
var i, b int
var prevKey string
var err error
defer snapshotMetrics.ReadLatency.Start().Stop()
cfg := s.outPipe.Config()
for outMsg != nil || snReader.FetchNext() { //use last message from prev batch or fetch next one
if outMsg == nil {
key, pKey, outMsg, err = snReader.Pop()
if log.EL(s.log, err) {
return false, 0, 0, "", "", nil, err
}
}
//Commit when batch full and partition key different from previous is fetched
//This means that batch can be a little bigger than MaxBatchSize
if (i >= cfg.MaxBatchSize || b >= cfg.MaxBatchSizeBytes) && pKey != prevKey {
break
}
if len(outMsg) == 0 {
outMsg = nil
} else {
b += len(outMsg)
key = outProducer.PartitionKey("snapshot", key)
err = outProducer.PushBatch(key, outMsg)
if log.EL(s.log, err) {
return false, 0, 0, "", "", nil, err
}
i++
outMsg = nil
prevKey = pKey
}
if ticker != nil {
select {
case <-ticker.C:
if err := s.snapshotTickHandler(snReader); err != nil {
s.log.Warnf("Snapshot cancelled: %v", err)
return false, 0, 0, "", "", nil, err
}
default:
}
}
}
snapshotMetrics.BatchSize.Record(time.Duration(i) * time.Millisecond)
if i == 0 {
return false, 0, 0, "", "", nil, nil
}
snapshotMetrics.BytesWritten.Inc(int64(b))
snapshotMetrics.EventsWritten.Inc(int64(i))
return true, int64(b), int64(i), key, pKey, outMsg, nil
}