ingestor/cluster/fake.go (62 lines of code) (raw):
package cluster
import (
"context"
"time"
"github.com/Azure/adx-mon/pkg/logger"
)
type FakeReplicator struct {
cancelFn context.CancelFunc
queue chan *Batch
}
func NewFakeReplicator() *FakeReplicator {
return &FakeReplicator{
queue: make(chan *Batch, 10000),
}
}
func (f *FakeReplicator) Open(ctx context.Context) error {
ctx, f.cancelFn = context.WithCancel(ctx)
go f.replicate(ctx)
return nil
}
func (f *FakeReplicator) Close() error {
f.cancelFn()
return nil
}
func (f *FakeReplicator) replicate(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
batch := <-f.queue
for _, seg := range batch.Segments {
if logger.IsDebug() {
logger.Debugf("Transferred %s", seg.Path)
}
}
if err := batch.Remove(); err != nil {
logger.Errorf("Failed to remove batch: %s", err.Error())
}
batch.Release()
}
}
func (f *FakeReplicator) TransferQueue() chan *Batch {
return f.queue
}
type fakeSegmentRemover struct {
}
func (f fakeSegmentRemover) Remove(path string) error {
return nil
}
type fakeBatcher struct{}
func (f fakeBatcher) Open(ctx context.Context) error { return nil }
func (f fakeBatcher) Close() error { return nil }
func (f fakeBatcher) BatchSegments() error { return nil }
func (f fakeBatcher) UploadQueueSize() int { return 0 }
func (f fakeBatcher) TransferQueueSize() int { return 0 }
func (f fakeBatcher) SegmentsTotal() int64 { return 0 }
func (f fakeBatcher) SegmentsSize() int64 { return 0 }
func (f fakeBatcher) Release(batch *Batch) {}
func (f fakeBatcher) Remove(batch *Batch) error { return nil }
func (f fakeBatcher) MaxSegmentAge() time.Duration { return 0 }