azkustodata/query/v2/iterative_table.go (76 lines of code) (raw):
package v2
import (
"context"
"github.com/Azure/azure-kusto-go/azkustodata/query"
"sync/atomic"
)
// iterativeTable represents a table that is streamed from the service.
// It is used by the iterative dataset.
// The rows are received from the service via the rawRows channel, and are parsed and sent to the rows channel.
type iterativeTable struct {
query.BaseTable
// a channel of rows and errors, exposed to the user
rows chan query.RowResult
// the number of rows in the table, updated as rows are received
rowCount atomic.Uint32
// a context for the table
ctx context.Context
}
// addRawRows is called by the dataset to add rows to the table.
// It will add the rows to the table, unless the table is already skipped.
func (t *iterativeTable) addRawRows(rows []query.Row) {
for _, row := range rows {
if !t.reportRow(row) {
return
}
t.rowCount.Add(1)
}
}
// RowCount returns the current number of rows in the table.
func (t *iterativeTable) RowCount() int {
return int(t.rowCount.Load())
}
func (t *iterativeTable) setRowCount(rowCount int) {
t.rowCount.Store(uint32(rowCount))
}
func NewIterativeTable(dataset *iterativeDataset, th TableHeader) (query.IterativeTable, error) {
baseTable, err := newBaseTableFromHeader(dataset, th)
if err != nil {
return nil, err
}
t := &iterativeTable{
BaseTable: baseTable,
ctx: dataset.Context(),
rows: make(chan query.RowResult, dataset.rowCapacity),
}
return t, nil
}
func (t *iterativeTable) finishTable(errs []OneApiError, cancelError error) {
if cancelError != nil {
t.reportError(cancelError)
} else if errs != nil {
t.reportError(combineOneApiErrors(errs))
}
close(t.rows)
}
func (t *iterativeTable) reportRow(row query.Row) bool {
select {
case t.rows <- query.RowResultSuccess(row):
return true
case <-t.ctx.Done():
return false
}
}
func (t *iterativeTable) reportError(err error) bool {
select {
case t.rows <- query.RowResultError(err):
return true
case <-t.ctx.Done():
return false
}
}
// Rows returns a channel of rows and errors.
func (t *iterativeTable) Rows() <-chan query.RowResult {
return t.rows
}
// ToTable reads the entire table, converting it from an iterative table to a regular table.
func (t *iterativeTable) ToTable() (query.Table, error) {
var rows []query.Row
for r := range t.rows {
if r.Err() != nil {
return nil, r.Err()
} else {
rows = append(rows, r.Row())
}
}
return query.NewTable(t.BaseTable, rows), nil
}