in ingestor/adx/syncer.go [279:356]
func (s *Syncer) ensurePromMetricsFunctions(ctx context.Context) error {
// functions is the list of functions that we need to create in the database. They are executed in order.
functions := []struct {
name string
body string
}{
{
name: "prom_increase",
body: `.create-or-alter function prom_increase (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| where isnan(Value)==false
| extend h=SeriesId
| partition hint.strategy=shuffle by h (
as Series
| order by h, Timestamp asc
| extend prevVal=prev(Value)
| extend diff=Value-prevVal
| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
| project-away prevVal, diff, h
)}`},
{
name: "prom_rate",
body: `.create-or-alter function prom_rate (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| invoke prom_increase(interval=interval)
| extend Value=Value/((Timestamp-prev(Timestamp))/1s)
| where isnotnull(Value)
| where isnan(Value) == false}`},
{
name: "prom_delta",
body: `.create-or-alter function prom_delta (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
T
| where isnan(Value)==false
| extend h=SeriesId
| partition hint.strategy=shuffle by h (
as Series
| order by h, Timestamp asc
| extend prevVal=prev(Value)
| extend diff=Value-prevVal
| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
| project-away prevVal, diff, h
)}`},
{
name: "CountCardinality",
body: `.create-or-alter function CountCardinality () {
union withsource=table *
| where Timestamp >= ago(1h) and Timestamp < ago(5m)
| summarize Value=toreal(dcount(SeriesId)) by table
| extend SeriesId=hash_xxhash64(table)
| extend Timestamp=bin(now(), 1m)
| extend Labels=bag_pack_columns(table)
| project Timestamp, SeriesId, Labels, Value
}`},
}
// This table is used to store the cardinality of all series in the database. It's updated by the CountCardinality function
// but we can't create the function unless a table exists.
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
".create table AdxmonIngestorTableCardinalityCount (Timestamp: datetime, SeriesId: long, Labels: dynamic, Value: real)")
_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
for _, fn := range functions {
logger.Infof("Creating function %s", fn.name)
stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fn.body)
_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
if err != nil {
return err
}
}
return nil
}