func()

in streamer/snapshot.go [197:272]


func (s *Streamer) streamFromConsistentSnapshot(throttleMB int64, throttleIOPS int64) bool {
	success := false
	snapshotMetrics := metrics.NewSnapshotMetrics("", s.getTag())

	snapshotMetrics.NumWorkers.Inc()
	defer func() {
		snapshotMetrics.NumWorkers.Dec()
	}()
	startTime := time.Now()

	outProducer, err := s.outPipe.NewProducer(s.topic)
	if err != nil {
		if os.IsExist(err) {
			s.log.Infof("Snapshot already exists")
			err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt)
			if err == nil {
				return true
			}
		}
		log.EL(s.log, err)
		snapshotMetrics.Errors.Inc(1)
		return false
	}
	outProducer.SetFormat(s.row.OutputFormat)

	defer func() {
		if !success {
			snapshotMetrics.Errors.Inc(1)
			log.EL(s.log, outProducer.CloseOnFailure())
		}
	}()

	s.log.Infof("Starting consistent snapshot streamer for: %v, %v", s.topic, s.outEncoder.Type())

	//For JSON format push schema as a first message of the stream
	if !s.pushSchema(outProducer) {
		return false
	}

	snReader, err := snapshot.Start(s.row.Input, s.row.Service, s.row.Cluster, s.row.DB, s.row.Table, s.row.Params, s.outEncoder, snapshotMetrics)
	if log.EL(s.log, err) {
		return false
	}
	defer snReader.End()

	iopsThrottler := throttle.New(throttleIOPS, 1000000, 3)
	defer iopsThrottler.Close()
	mbThrottler := throttle.New(throttleMB*1024*1024, 1000000, 3)
	defer mbThrottler.Close()
	if throttleIOPS != 0 || throttleMB != 0 {
		s.log.Debugf("Snapshot throttle enabled: %v IOPS, %v MBs", throttleIOPS, throttleMB)
	}

	ticker := time.NewTicker(cancelCheckInterval)
	defer ticker.Stop()

	if !s.streamLoop(snReader, outProducer, iopsThrottler, mbThrottler, ticker, snapshotMetrics) {
		return false
	}

	if shutdown.Initiated() {
		return false
	}

	if err = outProducer.Close(); err == nil {
		err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt)
		snapshotMetrics.Duration.Record(time.Since(startTime))
		snapshotMetrics.SizeRead.Set(snapshotMetrics.BytesRead.Get())
		snapshotMetrics.SizeWritten.Set(snapshotMetrics.BytesWritten.Get())
		if err == nil {
			success = true
		}
	}

	return !log.EL(s.log, err)
}