tools/cmd/write-load/main.go (206 lines of code) (raw):
package main
import (
"context"
"flag"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/prompb"
"github.com/Azure/adx-mon/pkg/promremote"
"github.com/Azure/adx-mon/tools/data"
)
type generator interface {
Read() (*prompb.WriteRequest, error)
}
// This program generates remote write load against a target endpoint.
func main() {
var (
dataFile, database, target string
verbose, dryRun bool
concurrency int
batchSize, metrics, cardinality int
totalSamples int64
)
flag.BoolVar(&verbose, "verbose", false, "Verbose logging")
flag.StringVar(&dataFile, "data-file", "", "Data file input created from data-gen utility")
flag.StringVar(&target, "target", "", "Remote write URL")
flag.IntVar(&concurrency, "concurrency", 1, "Concurrent writers")
flag.StringVar(&database, "database", "FakeDatabase", "Database name")
flag.IntVar(&batchSize, "batch-size", 2500, "Batch size of requests")
flag.IntVar(&metrics, "metrics", 100, "Numbe of distinct metrics")
flag.IntVar(&cardinality, "cardinality", 100000, "Total cardinality per metric")
flag.BoolVar(&dryRun, "dry-run", false, "Read data but don't send it")
flag.Int64Var(&totalSamples, "total-samples", 0, "Total samples to send (0 = continuous)")
flag.Parse()
if database == "" {
logger.Fatalf("database is required")
}
var dataGen generator
generator := &continuousDataGenerator{
set: data.NewDataSet(data.SetOptions{
NumMetrics: metrics,
Cardinality: cardinality,
Database: database,
}),
startTime: time.Now().UTC(),
batchSize: batchSize,
totalSamples: totalSamples,
}
dataGen = generator
if dataFile != "" {
r, err := data.NewFileReader(dataFile)
if err != nil {
logger.Fatalf("open file: %s", err)
}
defer r.Close()
dataGen = r
}
batches := make(chan *prompb.WriteRequest, 1000)
stats := newStats()
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
writer(ctx, target, stats, batches)
}()
}
go reportStats(ctx, stats, batches)
stats.StartTimer()
for {
var err error
batch, err := dataGen.Read()
if err == io.EOF {
break
} else if err != nil {
fmt.Println(err.Error())
return
}
if !dryRun {
batches <- batch
} else {
for _, ts := range batch.Timeseries {
stats.IncTotalSent(len(ts.Samples))
}
}
stats.IncTotalSeries(len(batch.Timeseries))
}
for len(batches) > 0 {
println(len(batches))
time.Sleep(100 * time.Millisecond)
}
stats.StopTimer()
cancel()
wg.Wait()
fmt.Printf("Total Metrics: %d\n", stats.TotalMetrics())
fmt.Printf("Total Samples: %d\n", stats.TotalSeries())
fmt.Printf("Duration: %s\n", stats.TimerDuration().String())
fmt.Printf("Load samples per/sec: %0.2f\n", float64(stats.TotalSeries())/stats.TimerDuration().Seconds())
}
func reportStats(ctx context.Context, stats *stats, batches chan *prompb.WriteRequest) {
t := time.NewTicker(time.Second)
defer t.Stop()
var lastTotal int64
for {
select {
case <-ctx.Done():
return
case <-t.C:
sent := stats.TotalSent() - lastTotal
fmt.Printf("Samples per/sec: %d (%0.2f samples/sec) (%d/%s) queued=%d/%d\n", sent, float64(stats.TotalSent())/stats.ElapsedDuration().Seconds(), stats.TotalSent(), stats.ElapsedDuration(), len(batches), stats.TotalSeries())
lastTotal = stats.TotalSent()
}
}
}
func writer(ctx context.Context, endpoint string, stats *stats, ch chan *prompb.WriteRequest) {
cli, err := promremote.NewClient(promremote.ClientOpts{
InsecureSkipVerify: true,
Timeout: 30 * time.Second,
Endpoint: endpoint,
})
if err != nil {
logger.Fatalf("prom client: %v", err)
}
for {
select {
case <-ctx.Done():
return
case batch := <-ch:
// ts := time.Now()
for {
err := cli.Write(context.Background(), endpoint, batch)
if err == nil {
break
}
logger.Warnf("write failed: %s %s. Retrying...", endpoint, err)
time.Sleep(1 * time.Second)
}
// println(time.Since(ts).String(), len(batch.Timeseries))
for _, ts := range batch.Timeseries {
stats.IncTotalSent(len(ts.Samples))
}
}
}
}
type continuousDataGenerator struct {
set *data.Set
startTime time.Time
batchSize int
totalSamples int64
sent int64
}
func (c *continuousDataGenerator) Read() (*prompb.WriteRequest, error) {
if c.totalSamples > 0 && atomic.LoadInt64(&c.sent) >= c.totalSamples {
return nil, io.EOF
}
wr := &prompb.WriteRequest{}
for i := 0; i < c.batchSize; i++ {
ts := c.set.Next(c.startTime)
wr.Timeseries = append(wr.Timeseries, ts)
}
c.startTime = c.startTime.Add(time.Second)
atomic.AddInt64(&c.sent, int64(c.batchSize))
return wr, nil
}
type stats struct {
totalSeries int64
totalSent int64
metrics map[string]struct{}
StartTime, StopTime time.Time
}
func newStats() *stats {
return &stats{
metrics: map[string]struct{}{},
}
}
func (s *stats) IncTotalSeries(n int) {
atomic.AddInt64(&s.totalSeries, int64(n))
}
func (s *stats) TotalSeries() int64 {
return atomic.LoadInt64(&s.totalSeries)
}
func (s *stats) RecordMetric(name string) {
s.metrics[name] = struct{}{}
}
func (s *stats) TotalMetrics() int {
return len(s.metrics)
}
func (s *stats) StartTimer() {
s.StartTime = time.Now()
}
func (s *stats) StopTimer() {
s.StopTime = time.Now()
}
func (s *stats) ElapsedDuration() time.Duration {
return time.Since(s.StartTime)
}
func (s *stats) TimerDuration() time.Duration {
return s.StopTime.Sub(s.StartTime)
}
func (s *stats) IncTotalSent(n int) {
atomic.AddInt64(&s.totalSent, int64(n))
}
func (s *stats) TotalSent() int64 {
return atomic.LoadInt64(&s.totalSent)
}