in ingestor/service.go [146:240]
func NewService(opts ServiceOpts) (*Service, error) {
store := storage.NewLocalStore(storage.StoreOpts{
StorageDir: opts.StorageDir,
SegmentMaxSize: opts.MaxSegmentSize,
SegmentMaxAge: opts.MaxSegmentAge,
EnableWALFsync: opts.EnableWALFsync,
})
coord, err := cluster.NewCoordinator(&cluster.CoordinatorOpts{
WriteTimeSeriesFn: store.WriteTimeSeries,
K8sCli: opts.K8sCli,
Hostname: opts.Hostname,
Namespace: opts.Namespace,
PartitionSize: opts.PartitionSize,
})
if err != nil {
return nil, err
}
health := cluster.NewHealth(cluster.HealthOpts{
UnhealthyTimeout: time.Minute,
MaxSegmentCount: opts.MaxSegmentCount,
MaxDiskUsage: opts.MaxDiskUsage,
})
repl, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Hostname: opts.Hostname,
Partitioner: coord,
InsecureSkipVerify: opts.InsecureSkipVerify,
Health: health,
SegmentRemover: store,
MaxTransferConcurrency: opts.MaxTransferConcurrency,
DisableGzip: true,
})
if err != nil {
return nil, err
}
batcher := cluster.NewBatcher(cluster.BatcherOpts{
StorageDir: opts.StorageDir,
MaxSegmentAge: opts.MaxSegmentAge,
MaxTransferSize: opts.MaxTransferSize,
MaxTransferAge: opts.MaxTransferAge,
MaxBatchSegments: opts.MaxBatchSegments,
Partitioner: coord,
Segmenter: store.Index(),
UploadQueue: opts.Uploader.UploadQueue(),
TransferQueue: repl.TransferQueue(),
PeerHealthReporter: health,
TransfersDisabled: opts.DisablePeerTransfer,
})
health.QueueSizer = batcher
allKustoCli := make([]metrics.StatementExecutor, 0, len(opts.MetricsKustoCli)+len(opts.LogsKustoCli))
allKustoCli = append(allKustoCli, opts.MetricsKustoCli...)
allKustoCli = append(allKustoCli, opts.LogsKustoCli...)
metricsSvc := metrics.NewService(metrics.ServiceOpts{
Hostname: opts.Hostname,
Elector: coord,
MetricsKustoCli: opts.MetricsKustoCli,
KustoCli: allKustoCli,
PeerHealthReport: health,
})
dbs := make(map[string]struct{}, len(opts.AllowedDatabase))
for _, db := range opts.AllowedDatabase {
dbs[db] = struct{}{}
}
databases := make(map[string]struct{})
for _, db := range opts.LogsDatabases {
databases[db] = struct{}{}
}
for _, db := range opts.MetricsDatabases {
databases[db] = struct{}{}
}
sched := scheduler.NewScheduler(coord)
return &Service{
opts: opts,
databases: databases,
uploader: opts.Uploader,
replicator: repl,
store: store,
coordinator: coord,
batcher: batcher,
metrics: metricsSvc,
health: health,
scheduler: sched,
dropFilePrefixes: opts.DropFilePrefixes,
}, nil
}