ingestor/adx/uploader.go (325 lines of code) (raw):
package adx
import (
"bytes"
"context"
"fmt"
"io"
"os"
"regexp"
"sync"
"time"
"github.com/Azure/adx-mon/ingestor/cluster"
"github.com/Azure/adx-mon/metrics"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
"github.com/Azure/adx-mon/pkg/testutils"
"github.com/Azure/adx-mon/pkg/wal"
adxschema "github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/azure-kusto-go/kusto/kql"
)
const ConcurrentUploads = 50
type Uploader interface {
service.Component
Database() string
Endpoint() string
// UploadQueue returns a channel that can be used to upload files to kusto.
UploadQueue() chan *cluster.Batch
// Mgmt executes a management query against the database.
Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
}
type uploader struct {
KustoCli *kusto.Client
storageDir string
database string
opts UploaderOpts
syncer *Syncer
queue chan *cluster.Batch
closeFn context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
ingestors map[string]ingest.Ingestor
requireDirectIngest bool
}
type UploaderOpts struct {
StorageDir string
Database string
ConcurrentUploads int
Dimensions []string
DefaultMapping adxschema.SchemaMapping
SampleType SampleType
}
func NewUploader(kustoCli *kusto.Client, opts UploaderOpts) *uploader {
syncer := NewSyncer(kustoCli, opts.Database, opts.DefaultMapping, opts.SampleType)
return &uploader{
KustoCli: kustoCli,
syncer: syncer,
storageDir: opts.StorageDir,
database: opts.Database,
opts: opts,
queue: make(chan *cluster.Batch, 10000),
ingestors: make(map[string]ingest.Ingestor),
}
}
func (n *uploader) Open(ctx context.Context) error {
c, closeFn := context.WithCancel(ctx)
n.closeFn = closeFn
if err := n.syncer.Open(c); err != nil {
return err
}
requireDirectIngest, err := n.clusterRequiresDirectIngest(ctx)
if err != nil {
return err
}
if requireDirectIngest {
logger.Warnf("Cluster=%s requires direct ingest: %s", n.database, n.KustoCli.Endpoint())
n.requireDirectIngest = true
}
for i := 0; i < n.opts.ConcurrentUploads; i++ {
go n.upload(c)
}
return nil
}
func (n *uploader) Close() error {
n.closeFn()
// Wait for all uploads to finish.
n.wg.Wait()
n.mu.Lock()
defer n.mu.Unlock()
for _, ing := range n.ingestors {
ing.Close()
}
n.ingestors = nil
return n.syncer.Close()
}
func (n *uploader) UploadQueue() chan *cluster.Batch {
return n.queue
}
func (n *uploader) Database() string {
return n.database
}
func (n *uploader) Endpoint() string {
return n.KustoCli.Endpoint()
}
func (n *uploader) Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
return n.KustoCli.Mgmt(ctx, n.database, query, options...)
}
func (n *uploader) uploadReader(reader io.Reader, database, table string, mapping adxschema.SchemaMapping) error {
// Ensure we wait for this upload to finish.
n.wg.Add(1)
defer n.wg.Done()
if err := n.syncer.EnsureTable(table, mapping); err != nil {
return err
}
name, err := n.syncer.EnsureMapping(table, mapping)
if err != nil {
return err
}
n.mu.RLock()
ingestor := n.ingestors[table]
n.mu.RUnlock()
if ingestor == nil {
ingestor, err = func() (ingest.Ingestor, error) {
n.mu.Lock()
defer n.mu.Unlock()
ingestor = n.ingestors[table]
if ingestor != nil {
return ingestor, nil
}
if n.requireDirectIngest {
ingestor = testutils.NewUploadReader(n.KustoCli, database, table)
n.ingestors[table] = ingestor
return ingestor, nil
}
ingestor, err = ingest.New(n.KustoCli, n.database, table)
if err != nil {
return nil, err
}
n.ingestors[table] = ingestor
return ingestor, nil
}()
if err != nil {
return err
}
}
// Set up a maximum time for completion to be 10 minutes.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
// uploadReader our file WITHOUT status reporting.
// When completed, delete the file on local storage we are uploading.
res, err := ingestor.FromReader(ctx, reader, ingest.IngestionMappingRef(name, ingest.CSV))
if err != nil {
return sanitizeErrorString(err)
}
err = <-res.Wait(ctx)
if err != nil {
return err
}
return nil
}
func sanitizeErrorString(err error) error {
errString := err.Error()
r := regexp.MustCompile(`sig=[a-zA-Z0-9]+`)
errString = r.ReplaceAllString(errString, "sig=REDACTED")
return fmt.Errorf(errString)
}
func (n *uploader) upload(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case batch := <-n.queue:
segments := batch.Segments
if batch.Database != n.database {
logger.Errorf("Database mismatch: %s != %s. Skipping batch", batch.Database, n.database)
continue
}
func() {
defer batch.Release()
var (
readers = make([]io.Reader, 0, len(segments))
segmentReaders = make([]*wal.SegmentReader, 0, len(segments))
database string
table string
schema string
header string
err error
)
for _, si := range segments {
database, table, schema, _, err = wal.ParseFilename(si.Path)
if err != nil {
logger.Errorf("Failed to parse file: %s", err.Error())
continue
}
metrics.SampleLatency.WithLabelValues(database, table).Set(time.Since(si.CreatedAt).Seconds())
var opts []wal.Option
if schema != "" {
opts = append(opts, wal.WithSkipHeader)
}
f, err := wal.NewSegmentReader(si.Path, opts...)
if os.IsNotExist(err) {
// batches are not disjoint, so the same segments could be included in multiple batches.
continue
} else if err != nil {
logger.Errorf("Failed to open file: %s", err.Error())
continue
}
segmentReaders = append(segmentReaders, f)
readers = append(readers, f)
}
defer func(segmentReaders []*wal.SegmentReader) {
for _, sr := range segmentReaders {
sr.Close()
}
}(segmentReaders)
if len(segmentReaders) == 0 {
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
samplePath := segmentReaders[0].Path()
database, table, schema, _, err = wal.ParseFilename(samplePath)
if err != nil {
logger.Errorf("Failed to parse file: %s: %s", samplePath, err.Error())
return
}
mapping := n.opts.DefaultMapping
if schema != "" {
header, err = n.extractSchema(samplePath)
if err != nil {
logger.Errorf("Failed to extract schema: %s: %s", samplePath, err.Error())
// This batch is invalid, remove it.
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
mapping, err = adxschema.UnmarshalSchema(header)
if err != nil {
logger.Errorf("Failed to unmarshal schema: %s: %s", samplePath, err.Error())
// This batch is invalid, remove it.
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
return
}
}
mr := io.MultiReader(readers...)
now := time.Now()
if err := n.uploadReader(mr, database, table, mapping); err != nil {
logger.Errorf("Failed to upload file: %s", err.Error())
return
}
if logger.IsDebug() {
logger.Debugf("Uploaded %v duration=%s", segments, time.Since(now).String())
}
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
for _, sr := range segmentReaders {
sampleType, sampleCount := sr.SampleMetadata()
switch sampleType {
case wal.MetricSampleType:
metrics.MetricsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
case wal.LogSampleType:
metrics.LogsUploaded.WithLabelValues(database, table).Add(float64(sampleCount))
}
}
}()
}
}
}
func (n *uploader) extractSchema(path string) (string, error) {
f, err := wal.NewSegmentReader(path)
if err != nil {
return "", err
}
defer f.Close()
b := make([]byte, 4096)
nn, err := f.Read(b)
if err != nil {
return "", err
}
b = b[:nn]
idx := bytes.IndexByte(b, '\n')
if idx != -1 {
return string(b[:idx]), nil
}
return string(b), nil
}
// clusterRequiresDirectIngest checks if the cluster is configured to require direct ingest.
// In particular, if a cluster's details have a named marked as KustoPersonal, we know this
// cluster to be a Kustainer, which does not support queued or streaming ingestion.
// https://learn.microsoft.com/en-us/azure/data-explorer/kusto-emulator-overview#limitations
func (n *uploader) clusterRequiresDirectIngest(ctx context.Context) (bool, error) {
stmt := kql.New(".show cluster details")
rows, err := n.KustoCli.Mgmt(ctx, n.database, stmt)
if err != nil {
return false, fmt.Errorf("failed to query cluster details: %w", err)
}
defer rows.Stop()
for {
row, errInline, errFinal := rows.NextRowOrError()
if errFinal == io.EOF {
break
}
if errInline != nil {
continue
}
if errFinal != nil {
return false, fmt.Errorf("failed to retrieve cluster details: %w", errFinal)
}
var cs clusterDetails
if err := row.ToStruct(&cs); err != nil {
return false, fmt.Errorf("failed to convert row to struct: %w", err)
}
return cs.Name == "KustoPersonal", nil
}
return false, nil
}
type clusterDetails struct {
NodeId string `kusto:"NodeId"`
Address string `kusto:"Address"`
Name string `kusto:"Name"`
StartTime time.Time `kusto:"StartTime"`
AssignedHotExtents int `kusto:"AssignedHotExtents"`
IsAdmin bool `kusto:"IsAdmin"`
MachineTotalMemory int64 `kusto:"MachineTotalMemory"`
MachineAvailableMemory int64 `kusto:"MachineAvailableMemory"`
ProcessorCount int `kusto:"ProcessorCount"`
HotExtentsOriginalSize int64 `kusto:"HotExtentsOriginalSize"`
HotExtentsSize int64 `kusto:"HotExtentsSize"`
EnvironmentDescription string `kusto:"EnvironmentDescription"`
ProductVersion string `kusto:"ProductVersion"`
Reserved0 int `kusto:"Reserved0"`
ClockDescription string `kusto:"ClockDescription"`
RuntimeDescription string `kusto:"RuntimeDescription"`
}