_examples/bulk/kafka/producer/producer.go (158 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package producer
import (
"bytes"
"context"
"fmt"
"math/rand"
"net"
"time"
"github.com/segmentio/kafka-go"
)
var (
sides = []string{"BUY", "SELL"}
symbols = []string{"KBCU", "KBCU", "KBCU", "KJPR", "KJPR", "KSJD", "KXCV", "WRHV", "WTJB", "WMLU"}
accounts = []string{"ABC123", "ABC123", "ABC123", "LMN456", "LMN456", "STU789"}
)
func init() {
rand.Seed(time.Now().UnixNano())
kafka.DefaultClientID = "go-elasticsearch-kafka-demo"
}
type Producer struct {
BrokerURL string
TopicName string
TopicParts int
MessageRate int
writer *kafka.Writer
startTime time.Time
totalMessages int64
totalErrors int64
totalBytes int64
}
func (p *Producer) Run(ctx context.Context) error {
var messages []kafka.Message
p.startTime = time.Now()
p.writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{p.BrokerURL},
Topic: p.TopicName,
})
ticker := time.NewTicker(time.Second)
for {
select {
case t := <-ticker.C:
for i := 1; i <= p.MessageRate; i++ {
messages = append(messages, kafka.Message{Value: p.generateMessage(t)})
}
if err := p.writer.WriteMessages(ctx, messages...); err != nil {
messages = messages[:0]
return err
}
messages = messages[:0]
}
}
p.writer.Close()
ticker.Stop()
return nil
}
func (p *Producer) CreateTopic(ctx context.Context) error {
conn, err := net.Dial("tcp", p.BrokerURL)
if err != nil {
return err
}
return kafka.NewConn(conn, "", 0).CreateTopics(
kafka.TopicConfig{
Topic: p.TopicName,
NumPartitions: p.TopicParts,
ReplicationFactor: 1,
})
}
func (p *Producer) generateMessage(t time.Time) []byte {
var (
buf bytes.Buffer
timestamp time.Time
timeshift time.Duration
side string
quantity int
price int
amount int
symbol string
account string
)
timestamp = t
if timestamp.Minute() == 2 {
timeshift = -(time.Duration(time.Minute))
} else {
timeshift = time.Duration(time.Duration(rand.ExpFloat64()/2.0*100) * time.Second)
}
switch {
case timestamp.Minute()%5 == 0:
side = "SELL"
default:
if timestamp.Second()%3 == 0 {
side = "SELL"
} else {
side = "BUY"
}
}
switch {
case timestamp.Minute()%3 == 0:
quantity = rand.Intn(250) + 500
case timestamp.Second()%6 == 0:
quantity = rand.Intn(300) + 50
case timestamp.Second()%12 == 0:
quantity = rand.Intn(10) + 1
default:
quantity = rand.Intn(150) + 10
}
if side == "SELL" {
price = int(100.0 + 15.0*rand.NormFloat64())
} else {
price = int(250.0 + 50.0*rand.NormFloat64())
}
amount = quantity * price
if timestamp.Second()%4 == 0 {
symbol = "KXCV"
} else {
symbol = symbols[rand.Intn(len(symbols))]
}
if timestamp.Minute()%5 == 0 && timestamp.Second() > 30 {
account = "STU789"
} else {
account = accounts[rand.Intn(len(accounts))]
}
fmt.Fprintf(&buf,
`{"time":"%s", "symbol":"%s", "side":"%s", "quantity":%d, "price":%d, "amount":%d, "account":"%s"}`,
timestamp.UTC().Add(timeshift).Format(time.RFC3339),
symbol,
side,
quantity,
price,
amount,
account,
)
return buf.Bytes()
}
type Stats struct {
Duration time.Duration
TotalMessages int64
TotalErrors int64
TotalBytes int64
Throughput float64
}
func (p *Producer) Stats() Stats {
if p.writer == nil {
return Stats{}
}
duration := time.Since(p.startTime)
writerStats := p.writer.Stats()
p.totalMessages += writerStats.Messages
p.totalErrors += writerStats.Errors
p.totalBytes += writerStats.Bytes
rate := float64(p.totalMessages) / duration.Seconds()
return Stats{
Duration: duration,
TotalMessages: p.totalMessages,
TotalErrors: p.totalErrors,
TotalBytes: p.totalBytes,
Throughput: rate,
}
}