pkg/testutils/uploader.go (39 lines of code) (raw):

package testutils import ( "context" "errors" "fmt" "io" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/Azure/azure-kusto-go/kusto/kql" ) type Uploader struct { client *kusto.Client database string table string } // NewUploadReader implements ingest.Ingestor func NewUploadReader(client *kusto.Client, database string, table string) *Uploader { return &Uploader{ client: client, database: database, table: table, } } func (u *Uploader) Close() error { return nil } func (u *Uploader) FromFile(ctx context.Context, fPath string, options ...ingest.FileOption) (*ingest.Result, error) { return nil, errors.New("not implemented") } func (u *Uploader) FromReader(ctx context.Context, reader io.Reader, options ...ingest.FileOption) (*ingest.Result, error) { // Kustainer is not able to using a streaming ingestor as there is no storage containers backing the Kusto cluster. // We must instead ingest inline and thus consume the reader. data, err := io.ReadAll(reader) if err != nil { return nil, fmt.Errorf("failed to read data: %w", err) } stmt := kql.New(".ingest inline into table ").AddTable(u.table).AddLiteral(" <| ").AddUnsafe(string(data)) if _, err = u.client.Mgmt(ctx, u.database, stmt); err != nil { return nil, fmt.Errorf("failed to ingest data: %w", err) } return &ingest.Result{}, nil }