func()

in ingestor/adx/syncer.go [279:356]


func (s *Syncer) ensurePromMetricsFunctions(ctx context.Context) error {
	// functions is the list of functions that we need to create in the database.  They are executed in order.
	functions := []struct {
		name string
		body string
	}{
		{
			name: "prom_increase",
			body: `.create-or-alter function prom_increase (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
		T
		| where isnan(Value)==false
		| extend h=SeriesId
		| partition hint.strategy=shuffle by h (
			as Series
			| order by h, Timestamp asc
			| extend prevVal=prev(Value)
			| extend diff=Value-prevVal
			| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
			| project-away prevVal, diff, h
		)}`},

		{
			name: "prom_rate",
			body: `.create-or-alter function prom_rate (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
		T
		| invoke prom_increase(interval=interval)
		| extend Value=Value/((Timestamp-prev(Timestamp))/1s)
		| where isnotnull(Value)
		| where isnan(Value) == false}`},

		{

			name: "prom_delta",
			body: `.create-or-alter function prom_delta (T:(Timestamp:datetime, SeriesId: long, Labels:dynamic, Value:real), interval:timespan=1m) {
		T
		| where isnan(Value)==false
		| extend h=SeriesId
		| partition hint.strategy=shuffle by h (
			as Series
			| order by h, Timestamp asc
			| extend prevVal=prev(Value)
			| extend diff=Value-prevVal
			| extend Value=case(h == prev(h), case(diff < 0, next(Value)-Value, diff), real(0))
			| project-away prevVal, diff, h
		)}`},
		{

			name: "CountCardinality",
			body: `.create-or-alter function CountCardinality () {
				union withsource=table *
				| where Timestamp >= ago(1h) and Timestamp < ago(5m)
				| summarize Value=toreal(dcount(SeriesId)) by table
				| extend SeriesId=hash_xxhash64(table)
				| extend Timestamp=bin(now(), 1m)
				| extend Labels=bag_pack_columns(table)
				| project Timestamp, SeriesId, Labels, Value
		}`},
	}

	// This table is used to store the cardinality of all series in the database.  It's updated by the CountCardinality function
	// but we can't create the function unless a table exists.
	stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(
		".create table AdxmonIngestorTableCardinalityCount (Timestamp: datetime, SeriesId: long, Labels: dynamic, Value: real)")
	_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
	if err != nil {
		return err
	}

	for _, fn := range functions {
		logger.Infof("Creating function %s", fn.name)
		stmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(fn.body)
		_, err := s.KustoCli.Mgmt(ctx, s.database, stmt)
		if err != nil {
			return err
		}
	}
	return nil
}