in streamer/snapshot.go [197:272]
func (s *Streamer) streamFromConsistentSnapshot(throttleMB int64, throttleIOPS int64) bool {
success := false
snapshotMetrics := metrics.NewSnapshotMetrics("", s.getTag())
snapshotMetrics.NumWorkers.Inc()
defer func() {
snapshotMetrics.NumWorkers.Dec()
}()
startTime := time.Now()
outProducer, err := s.outPipe.NewProducer(s.topic)
if err != nil {
if os.IsExist(err) {
s.log.Infof("Snapshot already exists")
err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt)
if err == nil {
return true
}
}
log.EL(s.log, err)
snapshotMetrics.Errors.Inc(1)
return false
}
outProducer.SetFormat(s.row.OutputFormat)
defer func() {
if !success {
snapshotMetrics.Errors.Inc(1)
log.EL(s.log, outProducer.CloseOnFailure())
}
}()
s.log.Infof("Starting consistent snapshot streamer for: %v, %v", s.topic, s.outEncoder.Type())
//For JSON format push schema as a first message of the stream
if !s.pushSchema(outProducer) {
return false
}
snReader, err := snapshot.Start(s.row.Input, s.row.Service, s.row.Cluster, s.row.DB, s.row.Table, s.row.Params, s.outEncoder, snapshotMetrics)
if log.EL(s.log, err) {
return false
}
defer snReader.End()
iopsThrottler := throttle.New(throttleIOPS, 1000000, 3)
defer iopsThrottler.Close()
mbThrottler := throttle.New(throttleMB*1024*1024, 1000000, 3)
defer mbThrottler.Close()
if throttleIOPS != 0 || throttleMB != 0 {
s.log.Debugf("Snapshot throttle enabled: %v IOPS, %v MBs", throttleIOPS, throttleMB)
}
ticker := time.NewTicker(cancelCheckInterval)
defer ticker.Stop()
if !s.streamLoop(snReader, outProducer, iopsThrottler, mbThrottler, ticker, snapshotMetrics) {
return false
}
if shutdown.Initiated() {
return false
}
if err = outProducer.Close(); err == nil {
err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt)
snapshotMetrics.Duration.Record(time.Since(startTime))
snapshotMetrics.SizeRead.Set(snapshotMetrics.BytesRead.Get())
snapshotMetrics.SizeWritten.Set(snapshotMetrics.BytesWritten.Get())
if err == nil {
success = true
}
}
return !log.EL(s.log, err)
}