in cmd/cpe2cve/cpe2cve.go [122:176]
func processInput(in io.Reader, out io.Writer, caches map[string]*cvefeed.Cache, cfg config) chan struct{} {
done := make(chan struct{})
procIn := make(chan []string)
procOut := make(chan []string)
r := csv.NewReader(in)
r.Comma = rune(cfg.InFieldSeparator[0])
w := csv.NewWriter(out)
w.Comma = rune(cfg.OutFieldSeparator[0])
// spawn processing goroutines
var linesProcessed uint64
var procWG sync.WaitGroup
procWG.Add(cfg.NumProcessors)
for i := 0; i < cfg.NumProcessors; i++ {
go func() {
processAll(procIn, procOut, caches, cfg, &linesProcessed)
procWG.Done()
}()
}
// write processed results in background
go func() {
for rec := range procOut {
if err := w.Write(rec); err != nil {
flog.Errorf("write error: %v", err)
}
w.Flush()
}
if err := w.Error(); err != nil {
flog.Errorf("write error: %v", err)
}
close(done)
}()
start := time.Now()
// main goroutine reads input and sends it to processors
for line := 1; ; line++ {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
break
}
flog.Errorf("read error at line %d: %v", line, err)
}
procIn <- rec
}
close(procIn)
procWG.Wait()
close(procOut)
flog.V(1).Infof("processed %d lines in %v", linesProcessed, time.Since(start))
return done
}