func()

in internal/app/adapters/inbound/file/file_inbound.go [150:193]


func (f *FileInboundEndpoint) processFiles(ctx context.Context, files []string) error {
	// Check if sequential processing is required
	sequential := false
	if val, exists := f.config.Parameters["sequential"]; exists {
		var err error
		sequential, err = strconv.ParseBool(val)
		if err != nil {
			return fmt.Errorf("invalid sequential value: must be true/false, got '%s', defaulting to false", val)
		}
	}

	fileWg := &sync.WaitGroup{}

	for _, file := range files {
		select {
		case <-ctx.Done():
			slog.Info("cancelling remaining file processing")
			// Wait for all processing to complete before returning
			fileWg.Wait()
			return ctx.Err()
		default:
			// Check if file is already being processed
			if _, exists := f.processingFiles.LoadOrStore(file, true); exists {
				slog.Debug("skipping file - already being processed", "file", file)
				continue
			}

			if sequential {
				if err := f.processFile(ctx, file); err != nil {
					slog.Error("failed to process file", "error", err)
				}
			} else {
				fileWg.Add(1)
				go func(fileName string) {
					defer fileWg.Done()
					if err := f.processFile(ctx, fileName); err != nil {
						slog.Error("failed to process file", "error", err)
					}
				}(file)
			}
		}
	}
	return nil
}