in internal/app/adapters/inbound/file/file_inbound.go [150:193]
func (f *FileInboundEndpoint) processFiles(ctx context.Context, files []string) error {
// Check if sequential processing is required
sequential := false
if val, exists := f.config.Parameters["sequential"]; exists {
var err error
sequential, err = strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("invalid sequential value: must be true/false, got '%s', defaulting to false", val)
}
}
fileWg := &sync.WaitGroup{}
for _, file := range files {
select {
case <-ctx.Done():
slog.Info("cancelling remaining file processing")
// Wait for all processing to complete before returning
fileWg.Wait()
return ctx.Err()
default:
// Check if file is already being processed
if _, exists := f.processingFiles.LoadOrStore(file, true); exists {
slog.Debug("skipping file - already being processed", "file", file)
continue
}
if sequential {
if err := f.processFile(ctx, file); err != nil {
slog.Error("failed to process file", "error", err)
}
} else {
fileWg.Add(1)
go func(fileName string) {
defer fileWg.Done()
if err := f.processFile(ctx, fileName); err != nil {
slog.Error("failed to process file", "error", err)
}
}(file)
}
}
}
return nil
}