collector/logs/service.go (48 lines of code) (raw):

package logs import ( "context" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/adx-mon/pkg/service" ) type Service struct { Source service.Component Transforms []types.Transformer Sinks []types.Sink cancel context.CancelFunc } func (s *Service) Open(ctx context.Context) error { ctx, close := context.WithCancel(ctx) s.cancel = close // Start from end to front, so that we can close in reverse order. for _, sink := range s.Sinks { if err := sink.Open(ctx); err != nil { return err } } for i := len(s.Transforms) - 1; i >= 0; i-- { if err := s.Transforms[i].Open(ctx); err != nil { return err } } if err := s.Source.Open(ctx); err != nil { return err } return nil } func (s *Service) Close() error { if err := s.Source.Close(); err != nil { logger.Warnf("Failed to close source: %s", err) } s.cancel() for _, transform := range s.Transforms { if err := transform.Close(); err != nil { logger.Warnf("Failed to close transform: %s", err) } } for _, sink := range s.Sinks { if err := sink.Close(); err != nil { logger.Warnf("Failed to close sink: %s", err) } } return nil }