func()

in internal/app/adapters/inbound/file/file_inbound.go [101:132]


func (f *FileInboundEndpoint) poll(ctx context.Context) error {
	interval, err := strconv.Atoi(f.config.Parameters["interval"])
	if err != nil {
		slog.Error("invalid interval value", "error", err)
		return fmt.Errorf("invalid interval value: %w", err)
	}
	ticker := f.clock.NewTicker(time.Duration(interval) * time.Millisecond)
	defer ticker.Stop()

	processingWg := &sync.WaitGroup{}

	for {
		select {
		case <-ctx.Done():
			slog.Info("received shutdown signal, stopping file polling")
			// Wait for all processing to complete before returning
			processingWg.Wait()
			return ctx.Err()

		case <-ticker.C:
			processingWg.Add(1)
			go func() {
				defer processingWg.Done()
				if err := f.processingCycle(ctx); err != nil {
					if err != context.Canceled {
						slog.Error("error in processing cycle", "error", err)
					}
				}
			}()
		}
	}
}