ingestor/adx/dispatcher.go (72 lines of code) (raw):
package adx
import (
"context"
"github.com/Azure/adx-mon/ingestor/cluster"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/azure-kusto-go/kusto"
)
type dispatcher struct {
uploaders map[string]Uploader
queue chan *cluster.Batch
cancel context.CancelFunc
}
func NewDispatcher(uploaders []Uploader) *dispatcher {
d := &dispatcher{
uploaders: make(map[string]Uploader),
queue: make(chan *cluster.Batch, 10000),
}
for _, u := range uploaders {
logger.Infof("Registering uploader for database %s", u.Database())
d.uploaders[u.Database()] = u
}
return d
}
func (d *dispatcher) Open(ctx context.Context) error {
c, cancel := context.WithCancel(ctx)
d.cancel = cancel
for _, u := range d.uploaders {
if err := u.Open(c); err != nil {
return err
}
}
go d.upload(c)
return nil
}
func (d *dispatcher) Close() error {
d.cancel()
for _, u := range d.uploaders {
u.Close()
}
return nil
}
func (d *dispatcher) UploadQueue() chan *cluster.Batch {
return d.queue
}
func (d *dispatcher) Database() string {
return ""
}
func (d *dispatcher) Endpoint() string {
return ""
}
func (d *dispatcher) Mgmt(ctx context.Context, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error) {
// Not implemented. Should this fanout to all uploaders?
return nil, nil
}
func (d *dispatcher) upload(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case batch := <-d.queue:
u, ok := d.uploaders[batch.Database]
if !ok {
logger.Errorf("No uploader for database %s", batch.Database)
continue
}
select {
case u.UploadQueue() <- batch:
default:
logger.Errorf("Failed to queue batch for %s. Queue is full: %d/%d", batch.Database, len(u.UploadQueue()), cap(u.UploadQueue()))
}
}
}
}