schema/schema.go (315 lines of code) (raw):

package schema import ( "fmt" "strconv" "strings" "unicode" adxcsv "github.com/Azure/adx-mon/pkg/csv" "github.com/cespare/xxhash/v2" ) var ( DefaultMetricsMapping SchemaMapping = NewMetricsSchema() DefaultLogsMapping SchemaMapping = NewLogsSchema() ) const ( max_adx_identifier_length = 1024 ) type SchemaMapping []CSVMapping type CSVMapping struct { Column string `json:"Column"` DataType string `json:"DataType"` Properties struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` } `json:"Properties"` } func NewMetricsSchema() SchemaMapping { var ( mapping SchemaMapping idx int ) mapping = append(mapping, CSVMapping{ Column: "Timestamp", DataType: "datetime", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "SeriesId", DataType: "long", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "Labels", DataType: "dynamic", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "Value", DataType: "real", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) return mapping } func NewLogsSchema() SchemaMapping { // https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition var ( mapping SchemaMapping idx int ) mapping = append(mapping, CSVMapping{ Column: "Timestamp", DataType: "datetime", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "ObservedTimestamp", DataType: "datetime", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "TraceId", DataType: "string", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "SpanId", DataType: "string", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "SeverityText", DataType: "string", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "SeverityNumber", DataType: "int", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "Body", DataType: "dynamic", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "Resource", DataType: "dynamic", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 mapping = append(mapping, CSVMapping{ Column: "Attributes", DataType: "dynamic", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(idx), }, }) idx += 1 return mapping } func (m SchemaMapping) AddConstMapping(col, value string) SchemaMapping { return append(m, CSVMapping{ Column: col, DataType: "string", Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(len(m)), ConstValue: value, }, }) } func (m SchemaMapping) AddStringMapping(col string) SchemaMapping { return m.AddConstMapping(string(NormalizeMetricName([]byte(col))), "") } // NormalizeAdxIdentifier sanitizes a table or database name for use in ADX // This does not do ProperCase transformations for metrics, only removes invalid characters // See https://learn.microsoft.com/en-us/kusto/query/schema-entities/entity-names?view=microsoft-fabric#identifier-naming-rules func NormalizeAdxIdentifier(s string) string { // Most common to not need normalization in this path. Only allocate if needed. bytesToRemove := 0 for i := 0; i < len(s); i++ { // ADX appears to only accept ASCII letters/numbers if !(s[i] >= 'a' && s[i] <= 'z' || s[i] >= '0' && s[i] <= '9' || s[i] >= 'A' && s[i] <= 'Z') { bytesToRemove += 1 } } if bytesToRemove == 0 && len(s) <= max_adx_identifier_length { return s } destSize := len(s) - bytesToRemove var b strings.Builder if destSize < max_adx_identifier_length { b.Grow(destSize) } else { b.Grow(max_adx_identifier_length) } for i := 0; i < len(s); i++ { if s[i] >= 'a' && s[i] <= 'z' || s[i] >= '0' && s[i] <= '9' || s[i] >= 'A' && s[i] <= 'Z' { b.WriteByte(s[i]) if b.Len() >= max_adx_identifier_length { break } } } return b.String() } // NormalizeAdxIdentifier sanitizes a table or database name for use in ADX and appends it to dst. // This does not do ProperCase transformations for metrics, only removes invalid characters // See https://learn.microsoft.com/en-us/kusto/query/schema-entities/entity-names?view=microsoft-fabric#identifier-naming-rules func AppendNormalizeAdxIdentifier(dst, s []byte) []byte { appendedChars := 0 for i := 0; i < len(s); i++ { if s[i] >= 'a' && s[i] <= 'z' || s[i] >= '0' && s[i] <= '9' || s[i] >= 'A' && s[i] <= 'Z' { dst = append(dst, s[i]) appendedChars += 1 if appendedChars >= max_adx_identifier_length { break } } } return dst } // NormalizeMetricName converts a metrics name to a ProperCase table name func NormalizeMetricName(s []byte) []byte { return AppendNormalizeMetricName(make([]byte, 0, len(s)), s) } // AppendNormalizeMetricName converts a metrics name to a ProperCase table name and appends it to dst. func AppendNormalizeMetricName(dst, s []byte) []byte { // Most common to require normalization in the metric path to transform prom-style metrics to ProperCase table names appendedChars := 0 for i := 0; i < len(s); i++ { if appendedChars >= max_adx_identifier_length { break } // Skip any non-alphanumeric characters, but capitalize the first letter after it // ADX appears to only accept ASCII letters/numbers allowedChar := s[i] >= 'a' && s[i] <= 'z' || s[i] >= '0' && s[i] <= '9' || s[i] >= 'A' && s[i] <= 'Z' if !allowedChar { if i+1 < len(s) { if s[i+1] >= 'a' && s[i+1] <= 'z' { dst = append(dst, byte(unicode.ToUpper(rune(s[i+1])))) appendedChars += 1 i += 1 continue } } continue } // Capitalize the first letter if i == 0 { dst = append(dst, byte(unicode.ToUpper(rune(s[i])))) appendedChars += 1 continue } dst = append(dst, s[i]) appendedChars += 1 } return dst } func AppendCSVHeader(dst []byte, mapping SchemaMapping) []byte { for _, v := range mapping { if len(dst) > 0 { dst = append(dst, ',') } dst = append(dst, v.Column...) dst = append(dst, ':') dst = append(dst, v.DataType...) } return adxcsv.AppendNewLine(dst) } func UnmarshalSchema(data string) (SchemaMapping, error) { var mapping SchemaMapping idx := strings.IndexByte(data, '\n') if idx != -1 { data = data[:idx] } if len(data) == 0 { return SchemaMapping{}, nil } fields := strings.Split(data, ",") for i, v := range fields { nameType := strings.Split(v, ":") if len(nameType) != 2 { return nil, fmt.Errorf("invalid schema field: %s", data) } mapping = append(mapping, CSVMapping{ Column: nameType[0], DataType: nameType[1], Properties: struct { Ordinal string `json:"Ordinal,omitempty"` ConstValue string `json:"ConstValue,omitempty"` }{ Ordinal: strconv.Itoa(i), }, }) } return mapping, nil } func SchemaHash(mapping SchemaMapping) uint64 { x := xxhash.New() for _, v := range mapping { x.Write([]byte(v.Column)) x.Write([]byte(v.DataType)) } return x.Sum64() }