tunnel/kafka/reader.go (47 lines of code) (raw):
package kafka
import (
"github.com/Shopify/sarama"
)
type Reader struct {
brokers []string
topic string
partition int32
partitionConsumer sarama.PartitionConsumer
messageChannel chan *Message
}
func NewReader(address string) (*Reader, error) {
// c := NewConfig()
topic, brokers, err := parse(address)
if err != nil {
return nil, err
}
consumer, err := sarama.NewConsumer(brokers, nil)
if err != nil {
return nil, err
}
// pay attention: we fetch data from oldest offset when starting by default, so a lot data will be
// replay when receiver restarts.
partitionConsumer, err := consumer.ConsumePartition(topic, defaultPartition, sarama.OffsetOldest)
if err != nil {
return nil, err
}
r := &Reader{
brokers: brokers,
topic: topic,
partition: defaultPartition,
partitionConsumer: partitionConsumer,
messageChannel: make(chan *Message),
}
go r.send()
return r, nil
}
func (r *Reader) Read() chan *Message {
return r.messageChannel
}
func (r *Reader) send() {
for msg := range r.partitionConsumer.Messages() {
r.messageChannel <- &Message{
Key: msg.Key,
Value: msg.Value,
Offset: msg.Offset,
TimeStamp: msg.Timestamp,
}
}
}