tools/cmd/log-load/main.go (340 lines of code) (raw):
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"net/http"
"sync"
"sync/atomic"
"time"
v1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/logs/v1"
commonv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/common/v1"
otelv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/logs/v1"
resourcev1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/resource/v1"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/tools/data"
"github.com/golang/protobuf/proto"
)
type generator interface {
Read() (*v1.ExportLogsServiceRequest, error)
}
// This program generates remote write load against a target endpoint.
func main() {
var (
target string
verbose, dryRun bool
concurrency int
batchSize, metrics, cardinality int
totalSamples int64
)
flag.BoolVar(&verbose, "verbose", false, "Verbose logging")
flag.StringVar(&target, "target", "", "Remote write URL")
flag.IntVar(&concurrency, "concurrency", 1, "Concurrent writers")
flag.IntVar(&batchSize, "batch-size", 2500, "Batch size of requests")
flag.BoolVar(&dryRun, "dry-run", false, "Read data but don't send it")
flag.Int64Var(&totalSamples, "total-samples", 0, "Total samples to send (0 = continuous)")
flag.Parse()
target = fmt.Sprintf("%s/v1/logs", target)
var dataGen generator
generator := &continuousDataGenerator{
set: data.NewDataSet(data.SetOptions{
NumMetrics: metrics,
Cardinality: cardinality,
}),
startTime: time.Now().UTC(),
batchSize: batchSize,
totalSamples: totalSamples,
}
dataGen = generator
batches := make(chan *v1.ExportLogsServiceRequest, 1000)
stats := newStats()
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
writer(ctx, target, stats, batches)
}()
}
go reportStats(ctx, stats, batches)
stats.StartTimer()
for {
var err error
batch, err := dataGen.Read()
if err == io.EOF {
break
} else if err != nil {
fmt.Println(err.Error())
return
}
if !dryRun {
batches <- batch
} else {
for _, ts := range batch.ResourceLogs {
for _, scope := range ts.ScopeLogs {
stats.IncTotalSent(len(scope.LogRecords))
}
}
}
}
for len(batches) > 0 {
println(len(batches))
time.Sleep(100 * time.Millisecond)
}
stats.StopTimer()
cancel()
wg.Wait()
fmt.Printf("Total Metrics: %d\n", stats.TotalMetrics())
fmt.Printf("Total Samples: %d\n", stats.TotalSeries())
fmt.Printf("Duration: %s\n", stats.TimerDuration().String())
fmt.Printf("Load samples per/sec: %0.2f\n", float64(stats.TotalSeries())/stats.TimerDuration().Seconds())
}
func reportStats(ctx context.Context, stats *stats, batches chan *v1.ExportLogsServiceRequest) {
t := time.NewTicker(time.Second)
defer t.Stop()
var lastTotal int64
for {
select {
case <-ctx.Done():
return
case <-t.C:
sent := stats.TotalSent() - lastTotal
fmt.Printf("Samples per/sec: %d (%0.2f samples/sec) (%d/%s) queued=%d/%d\n", sent, float64(stats.TotalSent())/stats.ElapsedDuration().Seconds(), stats.TotalSent(), stats.ElapsedDuration(), len(batches), stats.TotalSeries())
lastTotal = stats.TotalSent()
}
}
}
func writer(ctx context.Context, endpoint string, stats *stats, ch chan *v1.ExportLogsServiceRequest) {
for {
select {
case <-ctx.Done():
return
case batch := <-ch:
b, err := proto.Marshal(batch)
if err != nil {
logger.Fatalf("marshal: %s", err)
}
req, err := http.NewRequest("POST", endpoint, bytes.NewReader(b))
if err != nil {
logger.Fatalf("new request: %s", err)
}
req.Header.Set("Content-Type", "application/x-protobuf")
// ts := time.Now()
for {
if err := func() error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}(); err != nil {
logger.Warnf("write failed: %s. Retrying...", err)
time.Sleep(1 * time.Second)
continue
}
break
}
// println(time.Since(ts).String(), len(batch.Timeseries))
for _, ts := range batch.ResourceLogs {
for _, scope := range ts.ScopeLogs {
stats.IncTotalSent(len(scope.LogRecords))
}
}
}
}
}
type continuousDataGenerator struct {
set *data.Set
startTime time.Time
batchSize int
totalSamples int64
sent int64
}
func (c *continuousDataGenerator) Read() (*v1.ExportLogsServiceRequest, error) {
if c.totalSamples > 0 && atomic.LoadInt64(&c.sent) >= c.totalSamples {
return nil, io.EOF
}
msg := newExportLogRequest()
for i := 0; i < c.batchSize; i++ {
msg.ResourceLogs[0].ScopeLogs[0].LogRecords = append(msg.ResourceLogs[0].ScopeLogs[0].LogRecords,
&otelv1.LogRecord{
TimeUnixNano: uint64(time.Now().UnixNano()),
SeverityNumber: 17,
SeverityText: "Error",
Body: &commonv1.AnyValue{
Value: &commonv1.AnyValue_KvlistValue{
KvlistValue: &commonv1.KeyValueList{
Values: []*commonv1.KeyValue{
{
Key: "message",
Value: &commonv1.AnyValue{
Value: &commonv1.AnyValue_StringValue{
StringValue: "something worth logging",
},
},
},
{
Key: "kusto.table",
Value: &commonv1.AnyValue{
Value: &commonv1.AnyValue_StringValue{
StringValue: "IngestTest",
},
},
},
{
Key: "kusto.database",
Value: &commonv1.AnyValue{
Value: &commonv1.AnyValue_StringValue{
StringValue: "AKSinfra",
},
},
},
},
},
},
},
DroppedAttributesCount: 1,
Flags: 1,
},
)
}
c.startTime = c.startTime.Add(time.Second)
atomic.AddInt64(&c.sent, int64(c.batchSize))
return msg, nil
}
type stats struct {
totalSeries int64
totalSent int64
metrics map[string]struct{}
StartTime, StopTime time.Time
}
func newStats() *stats {
return &stats{
metrics: map[string]struct{}{},
}
}
func (s *stats) IncTotalSeries(n int) {
atomic.AddInt64(&s.totalSeries, int64(n))
}
func (s *stats) TotalSeries() int64 {
return atomic.LoadInt64(&s.totalSeries)
}
func (s *stats) RecordMetric(name string) {
s.metrics[name] = struct{}{}
}
func (s *stats) TotalMetrics() int {
return len(s.metrics)
}
func (s *stats) StartTimer() {
s.StartTime = time.Now()
}
func (s *stats) StopTimer() {
s.StopTime = time.Now()
}
func (s *stats) ElapsedDuration() time.Duration {
return time.Since(s.StartTime)
}
func (s *stats) TimerDuration() time.Duration {
return s.StopTime.Sub(s.StartTime)
}
func (s *stats) IncTotalSent(n int) {
atomic.AddInt64(&s.totalSent, int64(n))
}
func (s *stats) TotalSent() int64 {
return atomic.LoadInt64(&s.totalSent)
}
func newExportLogRequest() *v1.ExportLogsServiceRequest {
return &v1.ExportLogsServiceRequest{
ResourceLogs: []*otelv1.ResourceLogs{
{
Resource: &resourcev1.Resource{
Attributes: []*commonv1.KeyValue{
{
Key: "source",
Value: &commonv1.AnyValue{
Value: &commonv1.AnyValue_StringValue{
StringValue: "hostname",
},
},
},
},
DroppedAttributesCount: 1,
},
ScopeLogs: []*otelv1.ScopeLogs{
{
Scope: &commonv1.InstrumentationScope{
Name: "name",
Version: "version",
DroppedAttributesCount: 1,
},
},
},
},
},
}
}
var rawlog = []byte(`{
"resourceLogs": [
{
"resource": {
"attributes": [
{
"key": "source"
"value": {
"stringValue": "hostname"
}
}
],
"droppedAttributesCount": 1
},
"scopeLogs": [
{
"scope": {
"name": "name",
"version": "version",
"droppedAttributesCount": 1
},
"logRecords": [
{
"timeUnixNano": "1669112524001",
"observedTimeUnixNano": "1669112524001",
"severityNumber": 17,
"severityText": "Error",
"body": {
"kvlistValue": {
"values": [
{
"key": "message",
"value": {
"stringValue": "something worth logging"
}
},
{
"key": "kusto.table",
"value": {
"stringValue": "ATable"
}
},
{
"key": "kusto.database",
"value": {
"stringValue": "ADatabase"
}
}
]
}
},
"droppedAttributesCount": 1,
"flags": 1,
"traceId": "",
"spanId": ""
}
],
"schemaUrl": "scope_schema"
}
],
"schemaUrl": "resource_schema"
}
]
}`)