ingestor/adx/syncer.go (382 lines of code) (raw):
package adx
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/schema"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/unsafe"
"github.com/cespare/xxhash"
)
type SampleType int
const (
PromMetrics SampleType = iota
OTLPLogs
)
type columnDef struct {
name, typ string
}
type mgmt interface {
Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
}
type Syncer struct {
KustoCli mgmt
database string
mu sync.RWMutex
mappings map[string]schema.SchemaMapping
st SampleType
tables map[string]struct{}
defaultMapping schema.SchemaMapping
cancelFn context.CancelFunc
}
type IngestionMapping struct {
Name string `kusto:"Name"`
Kind string `kusto:"Kind"`
Mapping string `kusto:"Mapping"`
LastUpdatedOn time.Time `kusto:"LastUpdatedOn"`
Database string `kusto:"Database"`
Table string `kusto:"Table"`
}
type Table struct {
TableName string `kusto:"TableName"`
}
func NewSyncer(kustoCli mgmt, database string, defaultMapping schema.SchemaMapping, st SampleType) *Syncer {
return &Syncer{
KustoCli: kustoCli,
database: database,
defaultMapping: defaultMapping,
mappings: make(map[string]schema.SchemaMapping),
st: st,
tables: make(map[string]struct{}),
}
}
func (s *Syncer) Open(ctx context.Context) error {
if err := s.loadIngestionMappings(ctx); err != nil {
return err
}
if err := s.ensureFunctions(ctx); err != nil {
return err
}
if err := s.ensureIngestionPolicy(ctx); err != nil {
return err
}
ctx, s.cancelFn = context.WithCancel(ctx)
go s.reconcileMappings(ctx)
return nil
}
func (s *Syncer) Close() error {
s.cancelFn()
return nil
}
func (s *Syncer) loadIngestionMappings(ctx context.Context) error {
query := fmt.Sprintf(".show database %s ingestion mappings", s.database)
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(query)
rows, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
for {
row, err1, err2 := rows.NextRowOrError()
if err2 == io.EOF {
return nil
} else if err1 != nil {
return err1
} else if err2 != nil {
return err2
}
var v IngestionMapping
if err := row.ToStruct(&v); err != nil {
return err
}
var sm schema.SchemaMapping
if err := json.Unmarshal([]byte(v.Mapping), &sm); err != nil {
return err
}
logger.Infof("Loaded %s ingestion mapping %s", s.database, v.Name)
s.mappings[v.Name] = sm
}
}
// EnsureDefaultTable creates a table with the default schema mapping if it does not exist.
func (s *Syncer) EnsureDefaultTable(table string) error {
return s.EnsureTable(table, s.defaultMapping)
}
// EnsureTable creates a table with the specified schema mapping if it does not exist.
func (s *Syncer) EnsureTable(table string, mapping schema.SchemaMapping) error {
s.mu.RLock()
if _, ok := s.tables[table]; ok {
s.mu.RUnlock()
return nil
}
s.mu.RUnlock()
var columns []columnDef
for _, v := range mapping {
columns = append(columns, columnDef{
name: v.Column,
typ: v.DataType,
})
}
var sb strings.Builder
sb.WriteString(".create-merge table ")
sb.WriteString(fmt.Sprintf("['%s'] ", table))
sb.WriteString("(")
for i, c := range mapping {
sb.WriteString(fmt.Sprintf("['%s']:%s", c.Column, c.DataType))
if i < len(mapping)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
if logger.IsDebug() {
logger.Debugf("Creating table %s %s", table, sb.String())
}
showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String())
rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt)
if err != nil {
return err
}
for {
_, err1, err2 := rows.NextRowOrError()
if err2 == io.EOF {
break
} else if err1 != nil {
return err1
} else if err2 != nil {
return err2
}
}
s.mu.Lock()
s.tables[table] = struct{}{}
s.mu.Unlock()
return nil
}
// EnsureMapping creates a schema mapping for the specified table if it does not exist. It returns the name of the mapping.
func (s *Syncer) EnsureDefaultMapping(table string) (string, error) {
return s.EnsureMapping(table, s.defaultMapping)
}
// EnsureMapping creates a schema mapping for the specified table if it does not exist. It returns the name of the mapping.
func (s *Syncer) EnsureMapping(table string, mapping schema.SchemaMapping) (string, error) {
var columns []columnDef
for _, v := range mapping {
columns = append(columns, columnDef{
name: v.Column,
typ: v.DataType,
})
}
var b bytes.Buffer
kind := schema.NormalizeMetricName([]byte(table))
b.Write(kind)
for _, v := range mapping {
b.Write(schema.NormalizeMetricName([]byte(v.Column)))
}
name := fmt.Sprintf("%s_%d", string(kind), xxhash.Sum64(b.Bytes()))
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.mappings[name]; ok {
return name, nil
}
var sb strings.Builder
sb.WriteString(".create-or-alter table ")
sb.WriteString(fmt.Sprintf("['%s'] ", table))
sb.WriteString(fmt.Sprintf("ingestion csv mapping \"%s\" '", name))
jsonb, err := json.Marshal(mapping)
if err != nil {
return "", err
}
sb.Write(jsonb)
sb.WriteString("'")
logger.Infof("Creating ingestion mapping for table %s %s", table, sb.String())
showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String())
rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt)
if err != nil {
return "", err
}
for {
_, err1, err2 := rows.NextRowOrError()
if err2 == io.EOF {
break
} else if err1 != nil {
return "", err1
} else if err2 != nil {
return "", err2
}
}
s.mappings[name] = mapping
return name, nil
}
func (s *Syncer) ensureFunctions(ctx context.Context) error {
switch s.st {
case PromMetrics:
return s.ensurePromMetricsFunctions(ctx)
case OTLPLogs:
return s.ensureOTLPLogsFunctions(ctx)
default:
return fmt.Errorf("unknown sample type %d", s.st)
}
}
func (s *Syncer) ensurePromMetricsFunctions(ctx context.Context) error {
// functions is the list of functions that we need to create in the database. They are executed in order.
functions := []struct {
name string
body string
}{
{
name: "prom_increase",
body: `.create-or-alter function prom_increase (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| where isnan(Value)==false
| extend h=SeriesId
| partition hint.strategy=shuffle by h (
as Series
| order by h, Timestamp asc
| extend prevVal=prev(Value)
| extend diff=Value-prevVal
| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
| project-away prevVal, diff, h
)}`},
{
name: "prom_rate",
body: `.create-or-alter function prom_rate (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| invoke prom_increase(interval=interval)
| extend Value=Value/((Timestamp-prev(Timestamp))/1s)
| where isnotnull(Value)
| where isnan(Value) == false}`},
{
name: "prom_delta",
body: `.create-or-alter function prom_delta (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| where isnan(Value)==false
| extend h=SeriesId
| partition hint.strategy=shuffle by h (
as Series
| order by h, Timestamp asc
| extend prevVal=prev(Value)
| extend diff=Value-prevVal
| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
| project-away prevVal, diff, h
)}`},
{
name: "CountCardinality",
body: `.create-or-alter function CountCardinality () {
union withsource=table *
| where Timestamp >= ago(1h) and Timestamp < ago(5m)
| summarize Value=toreal(dcount(SeriesId)) by table
| extend SeriesId=hash_xxhash64(table)
| extend Timestamp=bin(now(), 1m)
| extend Labels=bag_pack_columns(table)
| project Timestamp, SeriesId, Labels, Value
}`},
}
// This table is used to store the cardinality of all series in the database. It's updated by the CountCardinality function
// but we can't create the function unless a table exists.
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".create table AdxmonIngestorTableCardinalityCount (Timestamp: datetime, SeriesId: long, Labels: dynamic, Value: real)")
_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
for _, fn := range functions {
logger.Infof("Creating function %s", fn.name)
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fn.body)
_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
}
return nil
}
func (s *Syncer) ensureOTLPLogsFunctions(ctx context.Context) error {
return nil
}
func (s *Syncer) ensureIngestionPolicy(ctx context.Context) error {
type ingestionPolicy struct {
MaximumBatchingTimeSpan string `json:"MaximumBatchingTimeSpan"`
MaximumNumberOfItems int `json:"MaximumNumberOfItems"`
MaximumRawDataSizeMB int `json:"MaximumRawDataSizeMB"`
}
p := &ingestionPolicy{
MaximumBatchingTimeSpan: "00:00:30",
MaximumNumberOfItems: 500,
MaximumRawDataSizeMB: 1000,
}
// Optimize logs for throughput instead of latency
if s.st == OTLPLogs {
p = &ingestionPolicy{
MaximumBatchingTimeSpan: "00:05:00",
MaximumNumberOfItems: 500,
MaximumRawDataSizeMB: 4096,
}
}
b, err := json.Marshal(p)
if err != nil {
return err
}
logger.Infof("Creating ingestion batching policy: Database=%s MaximumBatchingTimeSpan=%s, MaximumNumberOfItems=%d, MaximumRawDataSizeMB=%d",
s.database, p.MaximumBatchingTimeSpan, p.MaximumNumberOfItems, p.MaximumRawDataSizeMB)
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
fmt.Sprintf(".alter-merge database %s policy ingestionbatching\n```%s\n```", s.database, string(b)))
_, err = s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
return nil
}
func (s *Syncer) reconcileMappings(ctx context.Context) {
t := time.NewTicker(24 * time.Hour)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if err := func() error {
tables, err := s.loadTables(ctx)
if err != nil {
return fmt.Errorf("error loading table details: %s", err)
}
if len(tables) == 0 {
logger.Warnf("No tables found in database %s. Skipping ingestion mapping cleanup.", s.database)
return nil
}
tableExists := make(map[string]struct{})
for _, v := range tables {
tableExists[v.TableName] = struct{}{}
}
s.mu.Lock()
defer s.mu.Unlock()
for k := range s.mappings {
tableName := strings.Split(k, "_")[0]
if _, ok := tableExists[tableName]; !ok {
logger.Debugf("Removing cached ingestion mapping %s from %s", k, s.database)
delete(s.mappings, k)
}
}
return nil
}(); err != nil {
logger.Errorf("Error removing unused ingestion mappings: %s", err)
}
}
}
}
func (s *Syncer) loadTables(ctx context.Context) ([]Table, error) {
stmt := kusto.NewStmt(".show tables | project TableName")
rows, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return nil, err
}
var tables []Table
for {
row, err1, err2 := rows.NextRowOrError()
if err2 == io.EOF {
return tables, nil
} else if err1 != nil {
return tables, err1
} else if err2 != nil {
return tables, err2
}
var v Table
if err := row.ToStruct(&v); err != nil {
return tables, err
}
tables = append(tables, v)
}
return tables, nil
}