pipe/kafka.go (453 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package pipe import ( "database/sql" "fmt" "golang.org/x/net/context" //"context" "strings" "sync" "github.com/Shopify/sarama" "github.com/uber/storagetapper/config" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/types" "github.com/uber/storagetapper/util" ) //Initial offset type var ( OffsetOldest = sarama.OffsetOldest OffsetNewest = sarama.OffsetNewest ) //offsetPersistInterval determines how often offsets will be persisted, //meaning at most offsetPersistInterval messages will be resent //after failure restart //At the moment of when we persist offset, we are pesisting not current offset //but current offset minus batchSize, to guarantee resending at least batchSize //last messages after failure restart var offsetPersistInterval int64 = 10000 //InitialOffset allows to configure global initial offset from which to start //consuming partitions which doesn't have offsets stored in the kafka_offsets table var InitialOffset = OffsetNewest //KafkaConfig global per process Sarama config var KafkaConfig *sarama.Config type kafkaPartition struct { id int32 offset int64 savedOffset int64 consumer sarama.PartitionConsumer childConsumer chan *sarama.ConsumerMessage nextMsg *sarama.ConsumerMessage } type topicConsumer struct { partitions []kafkaPartition consumers []chan *sarama.ConsumerMessage cancel context.CancelFunc wg sync.WaitGroup } // KafkaPipe is wrapper on top of Sarama library to produce/consume through kafka // * after failure shutdown pipe guarantees to resent last batchSize messages, //meaning batchSize messages may be in flight, reading (batchSize+1)th message //automatically acknowledges previous batch. // * producer caches and sends maximum batchSize messages at once type KafkaPipe struct { cfg config.PipeConfig conn *sql.DB saramaConsumer sarama.Consumer consumers map[string]*topicConsumer lock sync.RWMutex //protects consumers map, which can be modified by concurrent NewConsumer/closeConsumer } // kafkaProducer synchronously pushes messages to Kafka using topic specified during producer creation type kafkaProducer struct { topic string producer sarama.SyncProducer batch []*sarama.ProducerMessage batchPtr int log log.Logger } // kafkaConsumer consumes messages from Kafka using topic and partition specified during consumer creation type kafkaConsumer struct { baseConsumer pipe *KafkaPipe topic string ch chan *sarama.ConsumerMessage log log.Logger } func init() { registerPlugin("kafka", initKafkaPipe) } func initKafkaPipe(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { return &KafkaPipe{conn: db, cfg: *cfg}, nil } // Type returns Pipe type as Kafka func (p *KafkaPipe) Type() string { return "kafka" } // Config returns pipe configuration func (p *KafkaPipe) Config() *config.PipeConfig { return &p.cfg } // Close release resources associated with the pipe func (p *KafkaPipe) Close() error { if p.saramaConsumer == nil { return nil } return p.saramaConsumer.Close() } // Init initializes Kafka pipe creating kafka_offsets table func (p *KafkaPipe) Init() error { var err error var cfg *sarama.Config if KafkaConfig != nil { cfg = KafkaConfig } else { cfg = sarama.NewConfig() cfg.ClientID = types.MySvcName } p.consumers = make(map[string]*topicConsumer) p.saramaConsumer, err = sarama.NewConsumer(p.cfg.Kafka.Addresses, cfg) if log.E(err) { return err } if p.conn == nil { log.Warnf("No DB configured, offset won't be persisted") return nil } err = util.ExecSQL(p.conn, `CREATE TABLE IF NOT EXISTS `+types.MyDBName+`.kafka_offsets ( topic VARCHAR(255) CHARACTER SET utf8 NOT NULL, partitionId INT NOT NULL DEFAULT 0, offset BIGINT NOT NULL DEFAULT 0, PRIMARY KEY(topic, partitionId))`) if log.E(err) { return err } return nil } //NewProducer registers a new sync producer func (p *KafkaPipe) NewProducer(topic string) (Producer, error) { l := log.WithFields(log.Fields{"topic": topic}) producer, err := sarama.NewSyncProducer(p.cfg.Kafka.Addresses, p.producerConfig()) if log.EL(l, err) { return nil, err } return &kafkaProducer{topic, producer, make([]*sarama.ProducerMessage, p.cfg.MaxBatchSize), 0, l}, nil } func (p *KafkaPipe) producerConfig() *sarama.Config { if KafkaConfig != nil { return KafkaConfig } cfg := sarama.NewConfig() cfg.Producer.Return.Successes = true cfg.Producer.RequiredAcks = sarama.WaitForAll cfg.Producer.MaxMessageBytes = p.cfg.Kafka.MaxMessageBytes cfg.ClientID = types.MySvcName return cfg } //TODO: Think about moving offsets handling SQL to state package func (p *KafkaPipe) getOffsets(topic string) (map[int32]kafkaPartition, error) { res := make(map[int32]kafkaPartition) if p.conn == nil { return res, nil } rows, err := util.QuerySQL(p.conn, "SELECT partitionId, offset FROM kafka_offsets WHERE topic=?", topic) if err != nil { return nil, err } defer func() { log.E(rows.Close()) }() var r kafkaPartition for rows.Next() { if err := rows.Scan(&r.id, &r.offset); err == nil { if r.offset < 0 { r.offset = 0 } res[r.id] = r } } if err := rows.Err(); err != nil { return nil, err } return res, nil } //pushPartitionMsg pushes message to partition consumer, while waiting for //cancellation event also func (p *KafkaPipe) pushPartitionMsg(ctx context.Context, ch chan *sarama.ConsumerMessage, t *kafkaPartition) bool { select { case ch <- t.nextMsg: t.nextMsg = nil case <-ctx.Done(): return false } return true } /*Redistribute topic partitions amongst topic consumers */ func (p *KafkaPipe) redistributeConsumers(c *topicConsumer) { if len(c.consumers) == 0 { return } /*Stop currently running consumers and wait till they terminate*/ if c.cancel != nil { // can be nil when called for the first time c.cancel() } c.wg.Wait() var nparts = len(c.partitions) log.Debugf("Distributing %v partition(s) onto %v consumer(s)", nparts, len(c.consumers)) ctx, cancel := context.WithCancel(context.Background()) c.cancel = cancel j := 0 partsPerConsumer := nparts / (len(c.consumers) - j) c.wg.Add(nparts) for i := 0; i < nparts; i++ { c.partitions[i].childConsumer = c.consumers[j] /* Partition 'i' messages goes to consumer 'j' */ go func(ctx context.Context, i int, outCh chan *sarama.ConsumerMessage) { defer c.wg.Done() if c.partitions[i].nextMsg != nil && !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) { return } for { select { // FIXME: Config kafka and handle errors. By default errors logged only and not returned by the channel /* case msg := <-c.partitions[i].consumer.Errors(): select { case c.consumers[j] <- msg: case <-c.ctx.Done(): return } */ case c.partitions[i].nextMsg = <-c.partitions[i].consumer.Messages(): //Copy data to our buffer. Key field is not used, so not copied b := make([]byte, len(c.partitions[i].nextMsg.Value)) copy(b, c.partitions[i].nextMsg.Value) c.partitions[i].nextMsg.Value = b if !p.pushPartitionMsg(ctx, outCh, &c.partitions[i]) { return } case <-ctx.Done(): return } } }(ctx, i, c.consumers[j]) log.Debugf("Started consumer, partition=%d, push to channel=%d", i, j) /*Try our best to equally redistribute work */ if (nparts-i-1)%partsPerConsumer == 0 { j++ if len(c.consumers) != j { partsPerConsumer = (nparts - i - 1) / (len(c.consumers) - j) } } } } func (p *KafkaPipe) initTopicConsumer(topic string, l log.Logger) error { log.Debugf("initTopic consumer %v", topic) c := &topicConsumer{} /*Initialize topic partitions on first registered consumer*/ parts, err := p.saramaConsumer.Partitions(topic) if log.EL(l, err) { return err } offsets, err := p.getOffsets(topic) if log.EL(l, err) { return err } for _, i := range parts { o := InitialOffset if v, ok := offsets[i]; ok { o = v.offset } log.Debugf("start consuming partition %v from offset %v for topic %v", i, o, topic) pc, err := p.saramaConsumer.ConsumePartition(topic, i, o) if log.EL(l, err) { return err } c.partitions = append(c.partitions, kafkaPartition{i, InitialOffset, InitialOffset, pc, nil, nil}) } p.consumers[topic] = c return nil } //NewConsumer registers a new kafka consumer func (p *KafkaPipe) NewConsumer(topic string) (Consumer, error) { l := log.WithFields(log.Fields{"topic": topic}) l.Debugf("Registering consumer") p.lock.Lock() defer p.lock.Unlock() /*First consumer - initialize map, kafka consumer and state*/ if p.saramaConsumer == nil { if err := p.Init(); err != nil { return nil, err } } /*First consumer of this topic. Initialize*/ if c := p.consumers[topic]; c == nil { err := p.initTopicConsumer(topic, l) if err != nil { return nil, err } } if len(p.consumers[topic].consumers) >= len(p.consumers[topic].partitions) { return nil, fmt.Errorf("number of consumers(%v) should be less or eqaul to the number of partitions(%v)", len(p.consumers[topic].consumers)+1, len(p.consumers[topic].partitions)) } //FIXME: Can't use buffered channel here, because in the case, when one of //the consumer closes, messages, which are in the buffer, will be lost ch := make(chan *sarama.ConsumerMessage) //ch := make(chan *sarama.ConsumerMessage, p.batchSize) p.consumers[topic].consumers = append(p.consumers[topic].consumers, ch) p.redistributeConsumers(p.consumers[topic]) kc := &kafkaConsumer{pipe: p, topic: topic, ch: ch, log: l} kc.initBaseConsumer(kc.fetchNext) log.Debugf("Registered consumer %v", topic) return kc, nil } func (p *KafkaPipe) commitOffset(topic string, partition int32, offset int64, persistInterval int64, l log.Logger) error { tc := p.consumers[topic] if p.conn == nil || tc == nil { return nil } //BUG: Wrong. Partition is id not an index tp := &tc.partitions[partition] if tp.offset == InitialOffset { tp.savedOffset = offset } tp.offset = offset if offset == InitialOffset { return nil } if persistInterval != 0 { offset = offset - int64(p.cfg.MaxBatchSize) + 1 } else { offset++ //graceful shutdown, all messages acked, start from next offset next time } if tp.offset-tp.savedOffset >= persistInterval { err := util.ExecSQL(p.conn, "INSERT INTO kafka_offsets VALUES(?,?,?) ON DUPLICATE KEY UPDATE offset=?", topic, partition, offset, offset) if log.EL(l, err) { return err } tp.savedOffset = tp.offset } return nil } func (p *kafkaConsumer) commitConsumerPartitionOffsets() error { var v *kafkaPartition for i := 0; i < len(p.pipe.consumers[p.topic].partitions); i++ { v = &p.pipe.consumers[p.topic].partitions[i] if v.childConsumer != p.ch { continue } if err := p.pipe.commitOffset(p.topic, v.id, v.offset, 0, p.log); err != nil { return err } } return nil } //DeleteKafkaOffsets deletes Kafka offsets of all partitions of specified topic func DeleteKafkaOffsets(topic string, conn *sql.DB) error { log.Debugf("Deleting old Kafka topic offsets: %v", topic) if err := util.ExecSQL(conn, "DELETE FROM kafka_offsets WHERE topic=?", topic); err != nil { if !strings.Contains(err.Error(), "doesn't exist") { return err } } return nil } // closeConsumer closes Kafka consumer func (p *KafkaPipe) closeConsumer(kc *kafkaConsumer, graceful bool) error { p.lock.Lock() defer p.lock.Unlock() kc.log.Debugf("Closing consumer. graceful %v", graceful) if graceful { if err := kc.commitConsumerPartitionOffsets(); err != nil { return err } } t := p.consumers[kc.topic] c := t.consumers if len(c) > 1 { //FIXME: Avoid linear search below var i int for i = 0; i < len(c); i++ { if c[i] == kc.ch { break } } if i >= len(c) { return fmt.Errorf("consumer doesn't belong to the pipe") } /*Remove consumer by swapping with last element*/ c[i] = c[len(c)-1] t.consumers = c[:len(c)-1] p.redistributeConsumers(p.consumers[kc.topic]) } else { t.cancel() t.wg.Wait() for _, v := range t.partitions { err := v.consumer.Close() if log.EL(kc.log, err) { return err } } delete(p.consumers, kc.topic) kc.log.Debugf("Last consumer closed. Topic deleted") } return nil } //Push produces message to Kafka topic func (p *kafkaProducer) Push(in interface{}) error { _, _, err := p.pushLow("", in) return err } //PushK sends a keyed message to Kafka func (p *kafkaProducer) PushK(key string, in interface{}) error { _, _, err := p.pushLow(key, in) return err } func (p *kafkaProducer) pushLow(key string, in interface{}) (int32, int64, error) { var bytes []byte switch b := in.(type) { case []byte: bytes = b default: return 0, 0, fmt.Errorf("kafka pipe can handle binary arrays only") } if len(bytes) == 0 { log.WithFields(log.Fields{"topic": p.topic}).Warnf("Producing empty message to kafka") } msg := &sarama.ProducerMessage{Topic: p.topic, Value: sarama.ByteEncoder(bytes)} if key != "" { msg.Key = sarama.StringEncoder(key) } partition, offset, err := p.producer.SendMessage(msg) //partition, offset, err := p.producer.SendMessage(msg) log.EL(p.log, err) //log.Debugf("Message has been sent. Partition=%v. Offset=%v\n", partition, offset) return partition, offset, err } //PushBatch stashes a keyed message into batch which will be send to Kafka by //PushBatchCommit func (p *kafkaProducer) PushBatch(key string, in interface{}) error { var bytes []byte switch b := in.(type) { case []byte: bytes = b default: return fmt.Errorf("kafka pipe can handle binary arrays only") } if p.batch[p.batchPtr] == nil { p.batch[p.batchPtr] = new(sarama.ProducerMessage) } if len(bytes) == 0 { log.WithFields(log.Fields{"topic": p.topic}).Warnf("Producing empty message to kafka") } p.batch[p.batchPtr].Topic = p.topic p.batch[p.batchPtr].Key = sarama.StringEncoder(key) p.batch[p.batchPtr].Value = sarama.ByteEncoder(bytes) p.batchPtr++ if p.batchPtr >= len(p.batch) { p.batch = append(p.batch, make([]*sarama.ProducerMessage, 32)...) } return nil } //PushBatchCommit commits currently queued messages in the producer func (p *kafkaProducer) PushBatchCommit() error { if p.batchPtr == 0 { return nil } err := p.producer.SendMessages(p.batch[:p.batchPtr]) if !log.EL(p.log, err) { p.batchPtr = 0 } else { for _, m := range err.(sarama.ProducerErrors) { p.log.Errorf("%v", m.Error()) } } return err } func (p *kafkaProducer) PushSchema(key string, data []byte) error { return p.PushBatch(key, data) } // Close Kafka Producer func (p *kafkaProducer) Close() error { err := p.producer.Close() log.EL(p.log, err) return err } // CloseOnFailure Kafka Producer func (p *kafkaProducer) CloseOnFailure() error { err := p.producer.Close() log.EL(p.log, err) return err } //fetchNext fetches next message from Kafka and commits offset read func (p *kafkaConsumer) fetchNext() (interface{}, error) { select { case msg, ok := <-p.ch: if !ok { return nil, nil } p.pipe.lock.RLock() defer p.pipe.lock.RUnlock() err := p.pipe.commitOffset(msg.Topic, msg.Partition, msg.Offset, offsetPersistInterval, p.log) log.E(err) return msg.Value, err case <-p.ctx.Done(): } return nil, nil } //Close closes consumer func (p *kafkaConsumer) close(graceful bool) error { p.cancel() //Unblock FetchNext p.wg.Wait() return p.pipe.closeConsumer(p, graceful) } func (p *kafkaConsumer) Close() error { return p.close(true) } func (p *kafkaConsumer) CloseOnFailure() error { return p.close(false) } func (p *kafkaConsumer) SaveOffset() error { p.pipe.lock.Lock() defer p.pipe.lock.Unlock() return p.commitConsumerPartitionOffsets() } func (p *kafkaProducer) SetFormat(format string) { } //PartitionKey transform input row key into partition key func (p *kafkaProducer) PartitionKey(_ string, key string) string { return key } func (p *kafkaConsumer) SetFormat(format string) { } /* type saramaLogger struct { } var saramalogger saramaLogger func init() { sarama.Logger = &saramalogger } func (s *saramaLogger) Print(v ...interface{}) { log.Debugf("", v...) } func (s *saramaLogger) Printf(format string, v ...interface{}) { log.Debugf(format, v...) } func (s *saramaLogger) Println(v ...interface{}) { log.Debugf("", v...) } */