func NewService()

in ingestor/service.go [146:240]


func NewService(opts ServiceOpts) (*Service, error) {
	store := storage.NewLocalStore(storage.StoreOpts{
		StorageDir:     opts.StorageDir,
		SegmentMaxSize: opts.MaxSegmentSize,
		SegmentMaxAge:  opts.MaxSegmentAge,
		EnableWALFsync: opts.EnableWALFsync,
	})

	coord, err := cluster.NewCoordinator(&cluster.CoordinatorOpts{
		WriteTimeSeriesFn: store.WriteTimeSeries,
		K8sCli:            opts.K8sCli,
		Hostname:          opts.Hostname,
		Namespace:         opts.Namespace,
		PartitionSize:     opts.PartitionSize,
	})
	if err != nil {
		return nil, err
	}

	health := cluster.NewHealth(cluster.HealthOpts{
		UnhealthyTimeout: time.Minute,
		MaxSegmentCount:  opts.MaxSegmentCount,
		MaxDiskUsage:     opts.MaxDiskUsage,
	})

	repl, err := cluster.NewReplicator(cluster.ReplicatorOpts{
		Hostname:               opts.Hostname,
		Partitioner:            coord,
		InsecureSkipVerify:     opts.InsecureSkipVerify,
		Health:                 health,
		SegmentRemover:         store,
		MaxTransferConcurrency: opts.MaxTransferConcurrency,
		DisableGzip:            true,
	})
	if err != nil {
		return nil, err
	}

	batcher := cluster.NewBatcher(cluster.BatcherOpts{
		StorageDir:         opts.StorageDir,
		MaxSegmentAge:      opts.MaxSegmentAge,
		MaxTransferSize:    opts.MaxTransferSize,
		MaxTransferAge:     opts.MaxTransferAge,
		MaxBatchSegments:   opts.MaxBatchSegments,
		Partitioner:        coord,
		Segmenter:          store.Index(),
		UploadQueue:        opts.Uploader.UploadQueue(),
		TransferQueue:      repl.TransferQueue(),
		PeerHealthReporter: health,
		TransfersDisabled:  opts.DisablePeerTransfer,
	})

	health.QueueSizer = batcher

	allKustoCli := make([]metrics.StatementExecutor, 0, len(opts.MetricsKustoCli)+len(opts.LogsKustoCli))
	allKustoCli = append(allKustoCli, opts.MetricsKustoCli...)
	allKustoCli = append(allKustoCli, opts.LogsKustoCli...)

	metricsSvc := metrics.NewService(metrics.ServiceOpts{
		Hostname:         opts.Hostname,
		Elector:          coord,
		MetricsKustoCli:  opts.MetricsKustoCli,
		KustoCli:         allKustoCli,
		PeerHealthReport: health,
	})

	dbs := make(map[string]struct{}, len(opts.AllowedDatabase))
	for _, db := range opts.AllowedDatabase {
		dbs[db] = struct{}{}
	}

	databases := make(map[string]struct{})
	for _, db := range opts.LogsDatabases {
		databases[db] = struct{}{}
	}
	for _, db := range opts.MetricsDatabases {
		databases[db] = struct{}{}
	}

	sched := scheduler.NewScheduler(coord)

	return &Service{
		opts:             opts,
		databases:        databases,
		uploader:         opts.Uploader,
		replicator:       repl,
		store:            store,
		coordinator:      coord,
		batcher:          batcher,
		metrics:          metricsSvc,
		health:           health,
		scheduler:        sched,
		dropFilePrefixes: opts.DropFilePrefixes,
	}, nil
}