azkustodata/query/row_impl.go (229 lines of code) (raw):

package query import ( "encoding/csv" "github.com/Azure/azure-kusto-go/azkustodata/errors" "github.com/Azure/azure-kusto-go/azkustodata/types" "github.com/Azure/azure-kusto-go/azkustodata/value" "github.com/google/uuid" "github.com/shopspring/decimal" "reflect" "strings" "time" ) type row struct { columns Columns columnByName func(string) Column values value.Values ordinal int } func NewRow(t BaseTable, ordinal int, values value.Values) Row { return NewRowFromParts(t.Columns(), t.ColumnByName, ordinal, values) } func NewRowFromParts(c Columns, columnByName func(string) Column, ordinal int, values value.Values) Row { return &row{ columns: c, columnByName: columnByName, ordinal: ordinal, values: values, } } func (r *row) Columns() Columns { return r.columns } func (r *row) Index() int { return r.ordinal } func (r *row) Values() value.Values { return r.values } func (r *row) Value(i int) (value.Kusto, error) { if i < 0 || i >= len(r.values) { return nil, errors.ES(errors.OpTableAccess, errors.KClientArgs, "index %d out of range", i) } return r.values[i], nil } func (r *row) ValueByColumn(c Column) (value.Kusto, error) { return r.Value(c.Index()) } func (r *row) ValueByName(name string) (value.Kusto, error) { col := r.columnByName(name) if col == nil { return nil, columnNotFoundError(name) } return r.Value(col.Index()) } // ToStruct fetches the columns in a row into the fields of a struct. p must be a pointer to struct. // The rules for mapping a row's columns into a struct's exported fields are: // // 1. If a field has a `kusto: "column_name"` tag, then decode column // 'column_name' into the field. A special case is the `column_name: "-"` // tag, which instructs ToStruct to ignore the field during decoding. // // 2. Otherwise, if the name of a field matches the name of a column (ignoring case), // decode the column into the field. // // Slice and pointer fields will be set to nil if the source column is a null value, and a // non-nil value if the column is not NULL. To decode NULL values of other types, use // one of the kusto types (Int, Long, Dynamic, ...) as the type of the destination field. // You can check the .Valid field of those types to see if the value was set. func (r *row) ToStruct(p interface{}) error { // Check if p is a pointer to a struct if t := reflect.TypeOf(p); t == nil || t.Kind() != reflect.Ptr || t.Elem().Kind() != reflect.Struct { return errors.ES(errors.OpTableAccess, errors.KClientArgs, "type %T is not a pointer to a struct", p) } if len(r.Columns()) != len(r.Values()) { return errors.ES(errors.OpTableAccess, errors.KClientArgs, "row does not have the correct number of values(%d) for the number of columns(%d)", len(r.Values()), len(r.Columns())) } return decodeToStruct(r.Columns(), r.Values(), p) } // String implements fmt.Stringer for a Row. This simply outputs a CSV version of the row. func (r *row) String() string { var line []string for _, v := range r.Values() { line = append(line, v.String()) } b := &strings.Builder{} w := csv.NewWriter(b) err := w.Write(line) if err != nil { return "" } w.Flush() return b.String() } func conversionError(from string, to string) error { return errors.ES(errors.OpTableAccess, errors.KOther, "cannot convert %s to %s", from, to) } func columnNotFoundError(name string) error { return errors.ES(errors.OpTableAccess, errors.KOther, "column %s not found", name) } // contains all types *bool, etc type kustoTypeGeneric interface { *bool | *int32 | *int64 | *float64 | *decimal.Decimal | string | interface{} | *time.Time | *time.Duration } func byIndex[T kustoTypeGeneric](r *row, colType types.Column, i int, defaultValue T) (T, error) { val, err := r.Value(i) if err != nil { return defaultValue, err } if val.GetType() != colType { return defaultValue, conversionError(string(val.GetType()), string(colType)) } return val.GetValue().(T), nil } func byName[T kustoTypeGeneric](r *row, colType types.Column, name string, defaultValue T) (T, error) { col := r.columnByName(name) if col == nil { return defaultValue, columnNotFoundError(name) } return byIndex(r, colType, col.Index(), defaultValue) } func (r *row) BoolByIndex(i int) (*bool, error) { return byIndex(r, types.Bool, i, (*bool)(nil)) } func (r *row) IntByIndex(i int) (*int32, error) { return byIndex(r, types.Int, i, (*int32)(nil)) } func (r *row) LongByIndex(i int) (*int64, error) { return byIndex(r, types.Long, i, (*int64)(nil)) } func (r *row) RealByIndex(i int) (*float64, error) { return byIndex(r, types.Real, i, (*float64)(nil)) } func (r *row) DecimalByIndex(i int) (*decimal.Decimal, error) { return byIndex(r, types.Decimal, i, (*decimal.Decimal)(nil)) } func (r *row) StringByIndex(i int) (string, error) { return byIndex(r, types.String, i, "") } func (r *row) DynamicByIndex(i int) ([]byte, error) { return byIndex[[]byte](r, types.Dynamic, i, nil) } func (r *row) DateTimeByIndex(i int) (*time.Time, error) { return byIndex(r, types.DateTime, i, (*time.Time)(nil)) } func (r *row) TimespanByIndex(i int) (*time.Duration, error) { return byIndex(r, types.Timespan, i, (*time.Duration)(nil)) } func (r *row) GuidByIndex(i int) (*uuid.UUID, error) { return byIndex(r, types.GUID, i, (*uuid.UUID)(nil)) } func (r *row) BoolByName(name string) (*bool, error) { return byName(r, types.Bool, name, (*bool)(nil)) } func (r *row) IntByName(name string) (*int32, error) { return byName(r, types.Int, name, (*int32)(nil)) } func (r *row) LongByName(name string) (*int64, error) { return byName(r, types.Long, name, (*int64)(nil)) } func (r *row) RealByName(name string) (*float64, error) { return byName(r, types.Real, name, (*float64)(nil)) } func (r *row) DecimalByName(name string) (*decimal.Decimal, error) { return byName(r, types.Decimal, name, (*decimal.Decimal)(nil)) } func (r *row) StringByName(name string) (string, error) { return byName(r, types.String, name, "") } func (r *row) DynamicByName(name string) ([]byte, error) { return byName[[]byte](r, types.Dynamic, name, nil) } func (r *row) DateTimeByName(name string) (*time.Time, error) { return byName(r, types.DateTime, name, (*time.Time)(nil)) } func (r *row) TimespanByName(name string) (*time.Duration, error) { return byName(r, types.Timespan, name, (*time.Duration)(nil)) } func (r *row) GuidByName(name string) (*uuid.UUID, error) { return byName(r, types.GUID, name, (*uuid.UUID)(nil)) } // ToStructs converts a table, a non-iterative dataset or a slice of rows into a slice of structs. // If a dataset is provided, it should contain exactly one table. func ToStructs[T any](data interface{}) ([]T, error) { var rows []Row var errs error switch v := data.(type) { case Table: rows = v.Rows() case IterativeTable: full, err := v.ToTable() if err != nil { return nil, err } rows = full.Rows() case []Row: rows = v case Row: rows = []Row{v} case Dataset: tables := v.Tables() if len(tables) == 0 { return nil, errors.ES(errors.OpUnknown, errors.KInternal, "dataset does not contain any tables") } if !tables[0].IsPrimaryResult() { return nil, errors.ES(errors.OpUnknown, errors.KInternal, "dataset contains no primary results") } rows = tables[0].Rows() default: return nil, errors.ES(errors.OpUnknown, errors.KInternal, "invalid data type - expected Dataset, Table, BaseTable or []Row") } if rows == nil || len(rows) == 0 { return nil, errs } out := make([]T, len(rows)) for i, r := range rows { if err := r.ToStruct(&out[i]); err != nil { out = out[:i] if len(out) == 0 { out = nil } return out, err } } return out, errs } type StructResult[T any] struct { Out T Err error } func ToStructsIterative[T any](tb IterativeTable) chan StructResult[T] { out := make(chan StructResult[T]) go func() { defer close(out) for rowResult := range tb.Rows() { if rowResult.Err() != nil { out <- StructResult[T]{Err: rowResult.Err()} } else { var s T if err := rowResult.Row().ToStruct(&s); err != nil { out <- StructResult[T]{Err: err} } else { out <- StructResult[T]{Out: s} } } } }() return out }