datasource/kafka/kafka.go (89 lines of code) (raw):
package kafka
import (
"context"
"fmt"
"github.com/alibaba/pairec/v2/log"
"github.com/alibaba/pairec/v2/recconf"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"strings"
)
type KafkaLog struct {
}
func (l *KafkaLog) Errorf(msg string, args ...interface{}) {
log.Error(fmt.Sprintf("msg=%s, args=%v", msg, args))
}
type KafkaProducer struct {
BootstrapServers string
Topic string
Producer *kafka.Writer
}
var kafkaProducerInstances = make(map[string]*KafkaProducer)
func GetKafkaProducer(name string) (*KafkaProducer, error) {
if _, ok := kafkaProducerInstances[name]; !ok {
return nil, fmt.Errorf("KafkaProducer not found, name:%s", name)
}
return kafkaProducerInstances[name], nil
}
func NewKafkaProducer(bootstrapServers, topic string) *KafkaProducer {
p := &KafkaProducer{
BootstrapServers: bootstrapServers,
Topic: topic,
}
return p
}
func (k *KafkaProducer) Init() error {
l := &KafkaLog{}
w := kafka.Writer{
Addr: kafka.TCP(strings.Split(k.BootstrapServers, ",")...),
Topic: k.Topic,
Balancer: kafka.CRC32Balancer{},
MaxAttempts: 3,
Async: true,
BatchBytes: 1048576 * 4,
ErrorLogger: kafka.LoggerFunc(l.Errorf),
Compression: compress.Snappy,
}
k.Producer = &w
return nil
}
func (k *KafkaProducer) SendMessage(message []byte) {
err := k.Producer.WriteMessages(context.Background(),
kafka.Message{
Value: message,
})
if err != nil {
log.Error(fmt.Sprintf("error=kafka write message(%v)", err))
}
}
func (k *KafkaProducer) SendMessages(module []byte, message []byte) {
err := k.Producer.WriteMessages(context.Background(),
kafka.Message{
Key: module,
Value: message,
})
if err != nil {
log.Error(fmt.Sprintf("error=kafka write message(%v)", err))
}
}
func (k *KafkaProducer) Close() {
if k.Producer != nil {
k.Producer.Close()
}
}
func Load(config *recconf.RecommendConfig) {
for name, conf := range config.KafkaConfs {
if _, ok := kafkaProducerInstances[name]; ok {
continue
}
m := &KafkaProducer{
BootstrapServers: conf.BootstrapServers,
Topic: conf.Topic,
}
err := m.Init()
if err != nil {
panic(err)
}
kafkaProducerInstances[name] = m
}
}