streamer/snapshot.go (204 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package streamer import ( "fmt" "os" "time" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/metrics" "github.com/uber/storagetapper/pipe" "github.com/uber/storagetapper/shutdown" "github.com/uber/storagetapper/snapshot" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/throttle" ) const numRetries = 5 var cancelCheckInterval = 180 * time.Second //The function may skip producing last fetched message, because of the partition //key change, so we need to return it and use as a first message on subsequent //call to streamBatch func (s *Streamer) streamBatch(snReader snapshot.Reader, outProducer pipe.Producer, key string, pKey string, outMsg []byte, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) (bool, int64, int64, string, string, []byte, error) { var i, b int var prevKey string var err error defer snapshotMetrics.ReadLatency.Start().Stop() cfg := s.outPipe.Config() for outMsg != nil || snReader.FetchNext() { //use last message from prev batch or fetch next one if outMsg == nil { key, pKey, outMsg, err = snReader.Pop() if log.EL(s.log, err) { return false, 0, 0, "", "", nil, err } } //Commit when batch full and partition key different from previous is fetched //This means that batch can be a little bigger than MaxBatchSize if (i >= cfg.MaxBatchSize || b >= cfg.MaxBatchSizeBytes) && pKey != prevKey { break } if len(outMsg) == 0 { outMsg = nil } else { b += len(outMsg) key = outProducer.PartitionKey("snapshot", key) err = outProducer.PushBatch(key, outMsg) if log.EL(s.log, err) { return false, 0, 0, "", "", nil, err } i++ outMsg = nil prevKey = pKey } if ticker != nil { select { case <-ticker.C: if err := s.snapshotTickHandler(snReader); err != nil { s.log.Warnf("Snapshot cancelled: %v", err) return false, 0, 0, "", "", nil, err } default: } } } snapshotMetrics.BatchSize.Record(time.Duration(i) * time.Millisecond) if i == 0 { return false, 0, 0, "", "", nil, nil } snapshotMetrics.BytesWritten.Inc(int64(b)) snapshotMetrics.EventsWritten.Inc(int64(i)) return true, int64(b), int64(i), key, pKey, outMsg, nil } func (s *Streamer) commitWithRetry(outProducer pipe.Producer, snapshotMetrics *metrics.Snapshot) bool { var err error defer snapshotMetrics.ProduceLatency.Start().Stop() for i := 0; i < numRetries; i++ { if err = outProducer.PushBatchCommit(); err == nil { return true } log.Warnf("Retrying...Attempt %v", i+1) } log.EL(s.log, err) return false } func (s *Streamer) pushSchema(outProducer pipe.Producer) bool { outMsg, err := s.outEncoder.EncodeSchema(0) if log.EL(s.log, err) { return false } if outMsg == nil { return true } key := outProducer.PartitionKey("snapshot", "schema") err = outProducer.PushSchema(key, outMsg) return !log.EL(s.log, err) } func (s *Streamer) snapshotTickHandler(snReader snapshot.Reader) error { reg, _ := state.TableRegisteredInState(s.row.ID) if !reg { return fmt.Errorf("table removed from ingestion") } ns, err := state.GetNeedSnapshotFlag(s.row.ID) if err != nil || !ns { return fmt.Errorf("snapshot not required for the table") } if !state.RefreshTableLock(s.row.ID, s.workerID) { return fmt.Errorf("lost the table lock") } if !s.clusterLock.Refresh() { return fmt.Errorf("lost the cluster lock") } return nil } func (s *Streamer) streamLoop(snReader snapshot.Reader, outProducer pipe.Producer, iopsThrottler, mbThrottler *throttle.Throttle, ticker *time.Ticker, snapshotMetrics *metrics.Snapshot) bool { var err error var next bool var nBytes, nEvents int64 var key, pKey string var outMsg []byte for !shutdown.Initiated() { next, nBytes, nEvents, key, pKey, outMsg, err = s.streamBatch(snReader, outProducer, key, pKey, outMsg, ticker, snapshotMetrics) if err != nil { return false } if !next { break } if !s.commitWithRetry(outProducer, snapshotMetrics) { return false } c := iopsThrottler.Advice(nEvents) m := mbThrottler.Advice(nBytes) if m > c { c = m } if c != 0 { time.Sleep(time.Microsecond * time.Duration(c)) snapshotMetrics.ThrottledUs.Inc(c) } select { case <-ticker.C: if err := s.snapshotTickHandler(snReader); err != nil { s.log.Warnf("Snapshot cancelled: %v", err) return false } default: } } return true } // StreamFromConsistentSnapshot initializes and pulls event from the Snapshot reader, serializes // them in Avro format and publishes to output. func (s *Streamer) streamFromConsistentSnapshot(throttleMB int64, throttleIOPS int64) bool { success := false snapshotMetrics := metrics.NewSnapshotMetrics("", s.getTag()) snapshotMetrics.NumWorkers.Inc() defer func() { snapshotMetrics.NumWorkers.Dec() }() startTime := time.Now() outProducer, err := s.outPipe.NewProducer(s.topic) if err != nil { if os.IsExist(err) { s.log.Infof("Snapshot already exists") err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt) if err == nil { return true } } log.EL(s.log, err) snapshotMetrics.Errors.Inc(1) return false } outProducer.SetFormat(s.row.OutputFormat) defer func() { if !success { snapshotMetrics.Errors.Inc(1) log.EL(s.log, outProducer.CloseOnFailure()) } }() s.log.Infof("Starting consistent snapshot streamer for: %v, %v", s.topic, s.outEncoder.Type()) //For JSON format push schema as a first message of the stream if !s.pushSchema(outProducer) { return false } snReader, err := snapshot.Start(s.row.Input, s.row.Service, s.row.Cluster, s.row.DB, s.row.Table, s.row.Params, s.outEncoder, snapshotMetrics) if log.EL(s.log, err) { return false } defer snReader.End() iopsThrottler := throttle.New(throttleIOPS, 1000000, 3) defer iopsThrottler.Close() mbThrottler := throttle.New(throttleMB*1024*1024, 1000000, 3) defer mbThrottler.Close() if throttleIOPS != 0 || throttleMB != 0 { s.log.Debugf("Snapshot throttle enabled: %v IOPS, %v MBs", throttleIOPS, throttleMB) } ticker := time.NewTicker(cancelCheckInterval) defer ticker.Stop() if !s.streamLoop(snReader, outProducer, iopsThrottler, mbThrottler, ticker, snapshotMetrics) { return false } if shutdown.Initiated() { return false } if err = outProducer.Close(); err == nil { err = state.ClearNeedSnapshot(s.row.ID, s.row.SnapshottedAt) snapshotMetrics.Duration.Record(time.Since(startTime)) snapshotMetrics.SizeRead.Set(snapshotMetrics.BytesRead.Get()) snapshotMetrics.SizeWritten.Set(snapshotMetrics.BytesWritten.Get()) if err == nil { success = true } } return !log.EL(s.log, err) }