collector/logs/service.go (48 lines of code) (raw):
package logs
import (
"context"
"github.com/Azure/adx-mon/collector/logs/types"
"github.com/Azure/adx-mon/pkg/logger"
"github.com/Azure/adx-mon/pkg/service"
)
type Service struct {
Source service.Component
Transforms []types.Transformer
Sinks []types.Sink
cancel context.CancelFunc
}
func (s *Service) Open(ctx context.Context) error {
ctx, close := context.WithCancel(ctx)
s.cancel = close
// Start from end to front, so that we can close in reverse order.
for _, sink := range s.Sinks {
if err := sink.Open(ctx); err != nil {
return err
}
}
for i := len(s.Transforms) - 1; i >= 0; i-- {
if err := s.Transforms[i].Open(ctx); err != nil {
return err
}
}
if err := s.Source.Open(ctx); err != nil {
return err
}
return nil
}
func (s *Service) Close() error {
if err := s.Source.Close(); err != nil {
logger.Warnf("Failed to close source: %s", err)
}
s.cancel()
for _, transform := range s.Transforms {
if err := transform.Close(); err != nil {
logger.Warnf("Failed to close transform: %s", err)
}
}
for _, sink := range s.Sinks {
if err := sink.Close(); err != nil {
logger.Warnf("Failed to close sink: %s", err)
}
}
return nil
}