streamer/buffer.go (205 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package streamer import ( "fmt" "strconv" "time" "github.com/pkg/errors" "github.com/uber/storagetapper/encoder" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/pipe" "github.com/uber/storagetapper/shutdown" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/types" ) var batchCommitInterval = 1 * time.Second func (s *Streamer) getTag() map[string]string { return map[string]string{ "table": s.row.Table, "db": s.row.DB, "cluster": s.row.Cluster, "input": s.row.Input, "version": strconv.Itoa(s.row.Version), } } func (s *Streamer) encodeCommonFormat(outProducer pipe.Producer, data []byte) (key string, outMsg []byte, err error) { cfEvent := &types.CommonFormatEvent{} payload, err := s.envEncoder.UnwrapEvent(data, cfEvent) if log.EL(s.log, err) { log.Errorf("broken event: %v %v", data, len(data)) return } s.metrics.TimeInBuffer.Record(time.Since(time.Unix(0, cfEvent.Timestamp))) // log.Debugf("commont format received %v %v", cfEvent, cfEvent.Fields) if cfEvent.Type == "insert" || cfEvent.Type == "delete" || cfEvent.Type == "schema" { outMsg, err = s.outEncoder.CommonFormat(cfEvent) if log.EL(s.log, err) { return } key = encoder.GetCommonFormatKey(cfEvent) if cfEvent.Type == "schema" && outMsg != nil { key = outProducer.PartitionKey("log", key) err = outProducer.PushSchema(key, outMsg) log.EL(s.log, err) outMsg = nil return } } else if cfEvent.Type == s.row.OutputFormat { outMsg = payload key = cfEvent.Key[0].(string) if key == "" { err = s.outEncoder.UpdateCodec() if log.EL(s.log, err) { return } } // log.Debugf("Data in final format already. Forwarding. Key=%v, SeqNo=%v", key, cfEvent.SeqNo) } else if cfEvent.Type == s.envEncoder.Type() { var ev *types.CommonFormatEvent ev, err = s.envEncoder.DecodeEvent(payload) if log.EL(s.log, err) { return } outMsg, err = s.outEncoder.CommonFormat(ev) if log.EL(s.log, err) { return } key = encoder.GetCommonFormatKey(ev) } else { err = fmt.Errorf("unsupported conversion from: %v to %v", cfEvent.Type, s.row.OutputFormat) } return } func (s *Streamer) produceEvent(outProducer pipe.Producer, data interface{}) error { var err error var outMsg []byte var key string // FIXME: We currently support only raw messages from local pipe or CommonFormat messages switch m := data.(type) { case *types.RowMessage: key = m.Key outMsg, err = s.outEncoder.Row(m.Type, m.Data, m.SeqNo, m.Timestamp) case []byte: if len(m) == 0 { // Kafka may return empty messages, skip them return nil } s.BytesRead += int64(len(m)) key, outMsg, err = s.encodeCommonFormat(outProducer, m) } if err != nil { return err } /* Schema events skipped */ if outMsg == nil { return nil } key = outProducer.PartitionKey("log", key) err = outProducer.PushBatch(key, outMsg) log.EL(s.log, err) if err == nil { s.BytesWritten += int64(len(outMsg)) } return err } /* //message passed from fetcher to the main loop type result struct { data interface{} err error hasNext bool } */ func (s *Streamer) commitBatch(outProducer pipe.Producer, numEvents int64) bool { s.metrics.EventsRead.Inc(numEvents) s.metrics.EventsWritten.Inc(numEvents) s.metrics.BatchSize.Record(time.Duration(numEvents) * time.Millisecond) s.metrics.BytesWritten.Set(s.BytesWritten) s.metrics.BytesRead.Set(s.BytesRead) s.metrics.ProduceLatency.Start() err := outProducer.PushBatchCommit() s.metrics.ProduceLatency.Stop() if err != nil { s.log.Errorf(errors.Wrap(err, "Failed to commit batch").Error()) return false } return true } //returns false if worker should exit func (s *Streamer) bufferTickHandler(consumer pipe.Consumer) bool { s.metrics.NumWorkers.Emit() if !state.RefreshTableLock(s.row.ID, s.workerID) { s.metrics.LockLost.Inc(1) s.log.Debugf("Lost table lock") return false } tblInfo, err := state.GetTableByID(s.row.ID) if log.EL(s.log, err) { return false } if tblInfo == nil { s.log.Debugf("Table removed from ingestion") return false } if (tblInfo.NeedSnapshot || tblInfo.SnapshotTimeChanged(s.row.SnapshottedAt) || tblInfo.TimeForSnapshot(time.Now())) && !s.row.Params.NoSnapshot { s.log.Debugf("Table needs snapshot") return false } //Guarantee that we can loose no more than state_update_interval seconds if err := consumer.SaveOffset(); err != nil { s.log.Errorf("Error persisting pipe position") return false } return true } func closeConsumer(consumer pipe.Consumer, saveOffsets bool, l log.Logger) { if saveOffsets { log.EL(l, consumer.Close()) } else { log.EL(l, consumer.CloseOnFailure()) } } // streamTable attempts to acquire a lock on a table partition and streams events from // that table partition while periodically updating its state of the last kafka offset consumed func (s *Streamer) streamTable(consumer pipe.Consumer) bool { var saveOffsets = true outProducer, err := s.outPipe.NewProducer(s.topic) if log.E(err) { return false } outProducer.SetFormat(s.row.OutputFormat) defer func() { closeConsumer(consumer, saveOffsets, s.log) log.EL(s.log, outProducer.Close()) }() stateUpdateTick := time.NewTicker(s.stateUpdateInterval) defer stateUpdateTick.Stop() batchCommitTick := time.NewTicker(batchCommitInterval) defer batchCommitTick.Stop() s.log.Debugf("Beginning to stream from buffer") var numEvents int64 var prevBytesWritten = s.BytesWritten for !shutdown.Initiated() { select { case <-stateUpdateTick.C: if !s.bufferTickHandler(consumer) { return true } case msg := <-consumer.Message(): if msg == nil { s.log.Debugf("End of message stream") return true } if err := s.produceEvent(outProducer, msg); err != nil { s.log.Errorf(errors.Wrap(err, "Failed to produce message").Error()) saveOffsets = false return false } numEvents++ numBytes := s.BytesWritten - prevBytesWritten // Commit the batch if we have reached batch size if numEvents >= int64(s.outPipe.Config().MaxBatchSize) || numBytes >= int64(s.outPipe.Config().MaxBatchSizeBytes) { if !s.commitBatch(outProducer, numEvents) { saveOffsets = false return false } numEvents = 0 prevBytesWritten = s.BytesWritten } case err := <-consumer.Error(): s.log.Errorf(errors.Wrap(err, "Failed to fetch next message").Error()) return false case <-batchCommitTick.C: // Its time to commit the batch, let's emit metrics as well if !s.commitBatch(outProducer, numEvents) { saveOffsets = false return false } numEvents = 0 prevBytesWritten = s.BytesWritten case <-shutdown.InitiatedCh(): } } return true }