pkg/testutils/kql_verify.go (178 lines of code) (raw):

package testutils import ( "context" "fmt" "io" "testing" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/kql" "github.com/stretchr/testify/require" ) type TableSchema interface { TableName() string CslColumns() []string } func TableExists(ctx context.Context, t *testing.T, database, table, uri string) bool { t.Helper() cb := kusto.NewConnectionStringBuilder(uri) client, err := kusto.New(cb) require.NoError(t, err) defer client.Close() stmt := kql.New(".show tables") rows, err := client.Mgmt(ctx, database, stmt) require.NoError(t, err) defer rows.Stop() for { row, errInline, errFinal := rows.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { t.Logf("Partial failure to retrieve tables: %v", errInline) continue } if errFinal != nil { t.Errorf("Failed to retrieve tables: %v", errFinal) } var tbl Table if err := row.ToStruct(&tbl); err != nil { t.Errorf("Failed to convert row to struct: %v", err) continue } if tbl.TableName == table { return true } } return false } type Table struct { TableName string `kusto:"TableName"` DatabaseName string `kusto:"DatabaseName"` Folder string `kusto:"Folder"` DocString string `kusto:"DocString"` } func FunctionExists(ctx context.Context, t *testing.T, database, function, uri string) bool { t.Helper() fn := GetFunction(ctx, t, database, function, uri) return fn.Name == function } func GetFunction(ctx context.Context, t *testing.T, database, function, uri string) Function { t.Helper() cb := kusto.NewConnectionStringBuilder(uri) client, err := kusto.New(cb) require.NoError(t, err) defer client.Close() stmt := kql.New(".show functions") rows, err := client.Mgmt(ctx, database, stmt) require.NoError(t, err) defer rows.Stop() for { row, errInline, errFinal := rows.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { t.Logf("Partial failure to retrieve functions: %v", errInline) continue } if errFinal != nil { t.Errorf("Failed to retrieve functions: %v", errFinal) } var fn Function if err := row.ToStruct(&fn); err != nil { t.Errorf("Failed to convert row to struct: %v", err) continue } if fn.Name == function { return fn } } return Function{} } type Function struct { Name string `kusto:"Name"` Parameters string `kusto:"Parameters"` Body string `kusto:"Body"` Folder string `kusto:"Folder"` DocString string `kusto:"DocString"` } func TableHasRows(ctx context.Context, t *testing.T, database, table, uri string) bool { t.Helper() cb := kusto.NewConnectionStringBuilder(uri) client, err := kusto.New(cb) require.NoError(t, err) defer client.Close() query := kql.New("").AddUnsafe(table).AddLiteral(" | count") rows, err := client.Query(ctx, database, query) require.NoError(t, err) defer rows.Stop() for { row, errInline, errFinal := rows.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { t.Logf("Partial failure to retrieve row count: %v", errInline) continue } if errFinal != nil { t.Errorf("Failed to retrieve row count: %v", errFinal) } var count RowCount if err := row.ToStruct(&count); err != nil { t.Errorf("Failed to convert row to struct: %v", err) continue } return count.Count > 0 } return false } type RowCount struct { Count int64 `kusto:"Count"` } func VerifyTableSchema(ctx context.Context, t *testing.T, database, table, uri string, expect TableSchema) { t.Helper() cb := kusto.NewConnectionStringBuilder(uri) client, err := kusto.New(cb) require.NoError(t, err) defer client.Close() query := kql.New("").AddUnsafe(table).AddLiteral(" | getschema") rows, err := client.Query(ctx, database, query) require.NoError(t, err) defer rows.Stop() var schema []*KqlSchema for { row, errInline, errFinal := rows.NextRowOrError() if errFinal == io.EOF { break } if errInline != nil { t.Logf("Partial failure to retrieve schema: %v", errInline) continue } if errFinal != nil { t.Errorf("Failed to retrieve schema: %v", errFinal) } var s KqlSchema if err := row.ToStruct(&s); err != nil { t.Errorf("Failed to convert row to struct: %v", err) continue } schema = append(schema, &s) } require.Equal(t, expect.TableName(), table) require.Equal(t, cslSchemaFromKqlSchema(schema), expect.CslColumns()) } type KqlSchema struct { ColumnName string `kusto:"ColumnName"` ColumnOrdinal int `kusto:"ColumnOrdinal"` DataType string `kusto:"DataType"` ColumnType string `kusto:"ColumnType"` } func cslSchemaFromKqlSchema(k []*KqlSchema) []string { var s []string for _, col := range k { s = append(s, fmt.Sprintf("%s:%s", col.ColumnName, col.ColumnType)) } return s }