ingestor/adx/fake.go (96 lines of code) (raw):
package adx
import (
"context"
"flag"
"io"
"os"
"testing"
"github.com/Azure/adx-mon/ingestor/cluster"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/table"
)
// fakeUploader is an Uploader that does nothing.
type fakeUploader struct {
queue chan *cluster.Batch
closeFn context.CancelFunc
db string
}
func (f *fakeUploader) Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
if !isTest() {
flag.String("test.v", "true", "fake")
}
rows, err := kusto.NewMockRows(table.Columns{
{Name: "Name", Type: "string"},
})
if err != nil {
return nil, err
}
iter := &kusto.RowIterator{}
iter.Mock(rows)
return iter, nil
}
func NewFakeUploader(db string) Uploader {
return &fakeUploader{
db: db,
queue: make(chan *cluster.Batch, 10000),
}
}
func (f *fakeUploader) Open(ctx context.Context) error {
ctx, f.closeFn = context.WithCancel(ctx)
go f.upload(ctx)
return nil
}
func (f *fakeUploader) Close() error {
f.closeFn()
return nil
}
func (f *fakeUploader) UploadQueue() chan *cluster.Batch {
return f.queue
}
func (f *fakeUploader) Database() string {
return f.db
}
func (f *fakeUploader) Endpoint() string {
return "fake"
}
func (f *fakeUploader) upload(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case batch := <-f.queue:
segments := batch.Segments
for _, si := range segments {
logger.Warnf("Uploading segment %s", si.Path)
if err := os.RemoveAll(si.Path); err != nil {
logger.Errorf("Failed to remove segment: %s", err.Error())
}
}
batch.Release()
}
}
}
type fakeKustoMgmt struct {
expectedQuery, actualQuery string
}
func (f *fakeKustoMgmt) Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
f.actualQuery = query.String()
rows, err := kusto.NewMockRows(table.Columns{
{Name: "Name", Type: "string"},
})
if err != nil {
return nil, err
}
iter := &kusto.RowIterator{}
rows.Error(io.EOF)
iter.Mock(rows)
return iter, nil
}
func (f *fakeKustoMgmt) Verify(t *testing.T) {
if f.expectedQuery != "" && f.actualQuery != f.expectedQuery {
t.Errorf("Expected query %s, got %s", f.expectedQuery, f.actualQuery)
}
}
func isTest() bool {
return flag.Lookup("test.v") != nil
}