func NewService()

in collector/service.go [184:359]


func NewService(opts *ServiceOpts) (*Service, error) {
	maxSegmentAge := 30 * time.Second
	if opts.MaxSegmentAge.Seconds() > 0 {
		maxSegmentAge = opts.MaxSegmentAge
	}

	maxSegmentSize := int64(1024 * 1024)
	if opts.MaxSegmentSize > 0 {
		maxSegmentSize = opts.MaxSegmentSize
	}

	maxSegmentCount := int64(10000)
	if opts.MaxSegmentCount > 0 {
		maxSegmentCount = opts.MaxSegmentCount
	}

	maxDiskUsage := int64(10 * 1024 * 1024 * 1024) // 10 GB
	if opts.MaxDiskUsage > 0 {
		maxDiskUsage = opts.MaxDiskUsage
	}

	health := cluster.NewHealth(cluster.HealthOpts{
		UnhealthyTimeout: time.Minute,
		MaxSegmentCount:  maxSegmentCount,
		MaxDiskUsage:     maxDiskUsage,
	})

	store := storage.NewLocalStore(storage.StoreOpts{
		StorageDir:       opts.StorageDir,
		SegmentMaxAge:    maxSegmentAge,
		SegmentMaxSize:   maxSegmentSize,
		MaxDiskUsage:     opts.MaxDiskUsage,
		LiftedLabels:     opts.LiftLabels,
		LiftedAttributes: opts.LiftAttributes,
		LiftedResources:  opts.LiftResources,
		WALFlushInterval: opts.WALFlushInterval,
	})

	var httpHandlers []*http.HttpHandler
	var grpcHandlers []*http.GRPCHandler
	workerSvcs := []service.Component{}

	for _, handlerOpts := range opts.PromMetricsHandlers {
		metricsProxySvc := metricsHandler.NewHandler(metricsHandler.HandlerOpts{
			Path:               handlerOpts.Path,
			RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
			RequestWriters:     append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRequestWriter{store}),
			HealthChecker:      health,
		})
		httpHandlers = append(httpHandlers, &http.HttpHandler{
			Path:    handlerOpts.Path,
			Handler: metricsProxySvc.HandleReceive,
		})
	}

	for _, handlerOpts := range opts.OtlpMetricsHandlers {
		writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{
			RequestTransformer:       handlerOpts.MetricOpts.RequestTransformer(),
			Clients:                  append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRemoteClient{store}),
			MaxBatchSize:             opts.MaxBatchSize,
			DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
			HealthChecker:            health,
		})
		oltpMetricsService := otlp.NewMetricsService(writer, handlerOpts.Path, handlerOpts.GrpcPort)
		if handlerOpts.Path != "" {
			httpHandlers = append(httpHandlers, &http.HttpHandler{
				Path:    handlerOpts.Path,
				Handler: oltpMetricsService.Handler,
			})
		}

		if handlerOpts.GrpcPort > 0 {
			path, handler := metricsv1connect.NewMetricsServiceHandler(oltpMetricsService)

			grpcHandlers = append(grpcHandlers, &http.GRPCHandler{
				Port:    handlerOpts.GrpcPort,
				Path:    path,
				Handler: handler,
			})
		}
	}

	var (
		replicator    service.Component
		transferQueue chan *cluster.Batch
		partitioner   cluster.MetricPartitioner
	)
	if opts.Endpoint != "" {
		// This is a static partitioner that forces all entries to be assigned to the remote endpoint.
		partitioner = remotePartitioner{
			host: "remote",
			addr: opts.Endpoint,
		}

		r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
			Hostname:               opts.NodeName,
			Partitioner:            partitioner,
			Health:                 health,
			SegmentRemover:         store,
			InsecureSkipVerify:     opts.InsecureSkipVerify,
			MaxTransferConcurrency: opts.MaxTransferConcurrency,
			DisableGzip:            opts.DisableGzip,
		})
		if err != nil {
			return nil, fmt.Errorf("failed to create replicator: %w", err)
		}
		transferQueue = r.TransferQueue()
		replicator = r
	} else {
		partitioner = remotePartitioner{
			host: "remote",
			addr: "http://remotehost:1234",
		}

		r := cluster.NewFakeReplicator()
		transferQueue = r.TransferQueue()
		replicator = r
	}

	batcher := cluster.NewBatcher(cluster.BatcherOpts{
		StorageDir:         opts.StorageDir,
		MaxSegmentAge:      time.Minute,
		Partitioner:        partitioner,
		Segmenter:          store.Index(),
		MinUploadSize:      4 * 1024 * 1024,
		MaxBatchSegments:   opts.MaxBatchSegments,
		UploadQueue:        transferQueue,
		TransferQueue:      transferQueue,
		PeerHealthReporter: health,
	})

	health.QueueSizer = batcher

	var scraper *Scraper
	if opts.Scraper != nil {
		scraperOpts := opts.Scraper
		scraperOpts.RemoteClients = append(scraperOpts.RemoteClients, &StoreRemoteClient{store})
		scraperOpts.HealthChecker = health

		scraper = NewScraper(opts.Scraper)
	}

	for _, handlerOpts := range opts.LogCollectionHandlers {
		svc, err := handlerOpts.Create(store)
		if err != nil {
			return nil, fmt.Errorf("failed to create log collection service: %w", err)
		}

		workerSvcs = append(workerSvcs, svc)
	}

	for _, handlerOpts := range opts.HttpLogCollectorOpts {
		svc, httpHandler, err := handlerOpts.CreateHTTPSvc(store, health)
		if err != nil {
			return nil, fmt.Errorf("failed to create log collection service: %w", err)
		}
		workerSvcs = append(workerSvcs, svc)
		httpHandlers = append(httpHandlers, httpHandler)
	}

	svc := &Service{
		opts: opts,
		metricsSvc: metrics.NewService(metrics.ServiceOpts{
			PeerHealthReport: health,
		}),
		store:        store,
		scraper:      scraper,
		workerSvcs:   workerSvcs,
		httpHandlers: httpHandlers,
		grpcHandlers: grpcHandlers,
		batcher:      batcher,
		replicator:   replicator,
	}

	return svc, nil
}