store/writer.go (69 lines of code) (raw):
package store
import (
"bytes"
"context"
"io"
"sync"
"time"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/dodopizza/jaeger-kusto/config"
"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/model"
"github.com/tushar2708/altcsv"
)
type kustoIngest interface {
FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error)
}
type kustoSpanWriter struct {
batchMaxBytes int
batchTimeout time.Duration
workersCount int
ingest kustoIngest
logger hclog.Logger
spanInput chan []string
shutdown chan struct{}
shutdownWg sync.WaitGroup
disableJaegerUiTraces bool
}
func newKustoSpanWriter(factory *kustoFactory, logger hclog.Logger, pc *config.PluginConfig) (*kustoSpanWriter, error) {
in, err := factory.Ingest()
if err != nil {
return nil, err
}
writer := &kustoSpanWriter{
batchMaxBytes: factory.PluginConfig.WriterBatchMaxBytes,
batchTimeout: time.Duration(factory.PluginConfig.WriterBatchTimeoutSeconds) * time.Second,
workersCount: factory.PluginConfig.WriterWorkersCount,
ingest: in,
logger: logger,
spanInput: make(chan []string, factory.PluginConfig.WriterSpanBufferSize),
shutdown: make(chan struct{}),
shutdownWg: sync.WaitGroup{},
disableJaegerUiTraces: pc.DisableJaegerUiTraces,
}
for i := 0; i < writer.workersCount; i++ {
go writer.ingestWorker()
}
return writer, nil
}
func (kw *kustoSpanWriter) WriteSpan(_ context.Context, span *model.Span) error {
spanStringArray, err := TransformSpanToStringArray(span)
kw.spanInput <- spanStringArray
return err
}
func (kw *kustoSpanWriter) Close() error {
kw.logger.Debug("plugin shutdown started")
kw.shutdownWg.Add(kw.workersCount)
kw.shutdown <- struct{}{}
kw.shutdownWg.Wait()
close(kw.spanInput)
kw.logger.Debug("plugin shutdown completed")
return nil
}
func (kw *kustoSpanWriter) ingestWorker() {
ticker := time.NewTicker(kw.batchTimeout)
defer ticker.Stop()
b := &bytes.Buffer{}
writer := altcsv.NewWriter(b)
writer.AllQuotes = true
}