in internal/app/adapters/inbound/file/file_inbound.go [101:132]
func (f *FileInboundEndpoint) poll(ctx context.Context) error {
interval, err := strconv.Atoi(f.config.Parameters["interval"])
if err != nil {
slog.Error("invalid interval value", "error", err)
return fmt.Errorf("invalid interval value: %w", err)
}
ticker := f.clock.NewTicker(time.Duration(interval) * time.Millisecond)
defer ticker.Stop()
processingWg := &sync.WaitGroup{}
for {
select {
case <-ctx.Done():
slog.Info("received shutdown signal, stopping file polling")
// Wait for all processing to complete before returning
processingWg.Wait()
return ctx.Err()
case <-ticker.C:
processingWg.Add(1)
go func() {
defer processingWg.Done()
if err := f.processingCycle(ctx); err != nil {
if err != context.Canceled {
slog.Error("error in processing cycle", "error", err)
}
}
}()
}
}
}