in collector/service.go [184:359]
func NewService(opts *ServiceOpts) (*Service, error) {
maxSegmentAge := 30 * time.Second
if opts.MaxSegmentAge.Seconds() > 0 {
maxSegmentAge = opts.MaxSegmentAge
}
maxSegmentSize := int64(1024 * 1024)
if opts.MaxSegmentSize > 0 {
maxSegmentSize = opts.MaxSegmentSize
}
maxSegmentCount := int64(10000)
if opts.MaxSegmentCount > 0 {
maxSegmentCount = opts.MaxSegmentCount
}
maxDiskUsage := int64(10 * 1024 * 1024 * 1024) // 10 GB
if opts.MaxDiskUsage > 0 {
maxDiskUsage = opts.MaxDiskUsage
}
health := cluster.NewHealth(cluster.HealthOpts{
UnhealthyTimeout: time.Minute,
MaxSegmentCount: maxSegmentCount,
MaxDiskUsage: maxDiskUsage,
})
store := storage.NewLocalStore(storage.StoreOpts{
StorageDir: opts.StorageDir,
SegmentMaxAge: maxSegmentAge,
SegmentMaxSize: maxSegmentSize,
MaxDiskUsage: opts.MaxDiskUsage,
LiftedLabels: opts.LiftLabels,
LiftedAttributes: opts.LiftAttributes,
LiftedResources: opts.LiftResources,
WALFlushInterval: opts.WALFlushInterval,
})
var httpHandlers []*http.HttpHandler
var grpcHandlers []*http.GRPCHandler
workerSvcs := []service.Component{}
for _, handlerOpts := range opts.PromMetricsHandlers {
metricsProxySvc := metricsHandler.NewHandler(metricsHandler.HandlerOpts{
Path: handlerOpts.Path,
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
RequestWriters: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRequestWriter{store}),
HealthChecker: health,
})
httpHandlers = append(httpHandlers, &http.HttpHandler{
Path: handlerOpts.Path,
Handler: metricsProxySvc.HandleReceive,
})
}
for _, handlerOpts := range opts.OtlpMetricsHandlers {
writer := otlp.NewOltpMetricWriter(otlp.OltpMetricWriterOpts{
RequestTransformer: handlerOpts.MetricOpts.RequestTransformer(),
Clients: append(handlerOpts.MetricOpts.RemoteWriteClients, &StoreRemoteClient{store}),
MaxBatchSize: opts.MaxBatchSize,
DisableMetricsForwarding: handlerOpts.MetricOpts.DisableMetricsForwarding,
HealthChecker: health,
})
oltpMetricsService := otlp.NewMetricsService(writer, handlerOpts.Path, handlerOpts.GrpcPort)
if handlerOpts.Path != "" {
httpHandlers = append(httpHandlers, &http.HttpHandler{
Path: handlerOpts.Path,
Handler: oltpMetricsService.Handler,
})
}
if handlerOpts.GrpcPort > 0 {
path, handler := metricsv1connect.NewMetricsServiceHandler(oltpMetricsService)
grpcHandlers = append(grpcHandlers, &http.GRPCHandler{
Port: handlerOpts.GrpcPort,
Path: path,
Handler: handler,
})
}
}
var (
replicator service.Component
transferQueue chan *cluster.Batch
partitioner cluster.MetricPartitioner
)
if opts.Endpoint != "" {
// This is a static partitioner that forces all entries to be assigned to the remote endpoint.
partitioner = remotePartitioner{
host: "remote",
addr: opts.Endpoint,
}
r, err := cluster.NewReplicator(cluster.ReplicatorOpts{
Hostname: opts.NodeName,
Partitioner: partitioner,
Health: health,
SegmentRemover: store,
InsecureSkipVerify: opts.InsecureSkipVerify,
MaxTransferConcurrency: opts.MaxTransferConcurrency,
DisableGzip: opts.DisableGzip,
})
if err != nil {
return nil, fmt.Errorf("failed to create replicator: %w", err)
}
transferQueue = r.TransferQueue()
replicator = r
} else {
partitioner = remotePartitioner{
host: "remote",
addr: "http://remotehost:1234",
}
r := cluster.NewFakeReplicator()
transferQueue = r.TransferQueue()
replicator = r
}
batcher := cluster.NewBatcher(cluster.BatcherOpts{
StorageDir: opts.StorageDir,
MaxSegmentAge: time.Minute,
Partitioner: partitioner,
Segmenter: store.Index(),
MinUploadSize: 4 * 1024 * 1024,
MaxBatchSegments: opts.MaxBatchSegments,
UploadQueue: transferQueue,
TransferQueue: transferQueue,
PeerHealthReporter: health,
})
health.QueueSizer = batcher
var scraper *Scraper
if opts.Scraper != nil {
scraperOpts := opts.Scraper
scraperOpts.RemoteClients = append(scraperOpts.RemoteClients, &StoreRemoteClient{store})
scraperOpts.HealthChecker = health
scraper = NewScraper(opts.Scraper)
}
for _, handlerOpts := range opts.LogCollectionHandlers {
svc, err := handlerOpts.Create(store)
if err != nil {
return nil, fmt.Errorf("failed to create log collection service: %w", err)
}
workerSvcs = append(workerSvcs, svc)
}
for _, handlerOpts := range opts.HttpLogCollectorOpts {
svc, httpHandler, err := handlerOpts.CreateHTTPSvc(store, health)
if err != nil {
return nil, fmt.Errorf("failed to create log collection service: %w", err)
}
workerSvcs = append(workerSvcs, svc)
httpHandlers = append(httpHandlers, httpHandler)
}
svc := &Service{
opts: opts,
metricsSvc: metrics.NewService(metrics.ServiceOpts{
PeerHealthReport: health,
}),
store: store,
scraper: scraper,
workerSvcs: workerSvcs,
httpHandlers: httpHandlers,
grpcHandlers: grpcHandlers,
batcher: batcher,
replicator: replicator,
}
return svc, nil
}