in tools/cmd/write-load/main.go [23:126]
func main() {
var (
dataFile, database, target string
verbose, dryRun bool
concurrency int
batchSize, metrics, cardinality int
totalSamples int64
)
flag.BoolVar(&verbose, "verbose", false, "Verbose logging")
flag.StringVar(&dataFile, "data-file", "", "Data file input created from data-gen utility")
flag.StringVar(&target, "target", "", "Remote write URL")
flag.IntVar(&concurrency, "concurrency", 1, "Concurrent writers")
flag.StringVar(&database, "database", "FakeDatabase", "Database name")
flag.IntVar(&batchSize, "batch-size", 2500, "Batch size of requests")
flag.IntVar(&metrics, "metrics", 100, "Numbe of distinct metrics")
flag.IntVar(&cardinality, "cardinality", 100000, "Total cardinality per metric")
flag.BoolVar(&dryRun, "dry-run", false, "Read data but don't send it")
flag.Int64Var(&totalSamples, "total-samples", 0, "Total samples to send (0 = continuous)")
flag.Parse()
if database == "" {
logger.Fatalf("database is required")
}
var dataGen generator
generator := &continuousDataGenerator{
set: data.NewDataSet(data.SetOptions{
NumMetrics: metrics,
Cardinality: cardinality,
Database: database,
}),
startTime: time.Now().UTC(),
batchSize: batchSize,
totalSamples: totalSamples,
}
dataGen = generator
if dataFile != "" {
r, err := data.NewFileReader(dataFile)
if err != nil {
logger.Fatalf("open file: %s", err)
}
defer r.Close()
dataGen = r
}
batches := make(chan *prompb.WriteRequest, 1000)
stats := newStats()
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
writer(ctx, target, stats, batches)
}()
}
go reportStats(ctx, stats, batches)
stats.StartTimer()
for {
var err error
batch, err := dataGen.Read()
if err == io.EOF {
break
} else if err != nil {
fmt.Println(err.Error())
return
}
if !dryRun {
batches <- batch
} else {
for _, ts := range batch.Timeseries {
stats.IncTotalSent(len(ts.Samples))
}
}
stats.IncTotalSeries(len(batch.Timeseries))
}
for len(batches) > 0 {
println(len(batches))
time.Sleep(100 * time.Millisecond)
}
stats.StopTimer()
cancel()
wg.Wait()
fmt.Printf("Total Metrics: %d\n", stats.TotalMetrics())
fmt.Printf("Total Samples: %d\n", stats.TotalSeries())
fmt.Printf("Duration: %s\n", stats.TimerDuration().String())
fmt.Printf("Load samples per/sec: %0.2f\n", float64(stats.TotalSeries())/stats.TimerDuration().Seconds())
}