func processInput()

in cmd/cpe2cve/cpe2cve.go [122:176]


func processInput(in io.Reader, out io.Writer, caches map[string]*cvefeed.Cache, cfg config) chan struct{} {
	done := make(chan struct{})
	procIn := make(chan []string)
	procOut := make(chan []string)

	r := csv.NewReader(in)
	r.Comma = rune(cfg.InFieldSeparator[0])

	w := csv.NewWriter(out)
	w.Comma = rune(cfg.OutFieldSeparator[0])

	// spawn processing goroutines
	var linesProcessed uint64
	var procWG sync.WaitGroup
	procWG.Add(cfg.NumProcessors)
	for i := 0; i < cfg.NumProcessors; i++ {
		go func() {
			processAll(procIn, procOut, caches, cfg, &linesProcessed)
			procWG.Done()
		}()
	}

	// write processed results in background
	go func() {
		for rec := range procOut {
			if err := w.Write(rec); err != nil {
				flog.Errorf("write error: %v", err)
			}
			w.Flush()
		}
		if err := w.Error(); err != nil {
			flog.Errorf("write error: %v", err)
		}
		close(done)
	}()

	start := time.Now()
	// main goroutine reads input and sends it to processors
	for line := 1; ; line++ {
		rec, err := r.Read()
		if err != nil {
			if err == io.EOF {
				break
			}
			flog.Errorf("read error at line %d: %v", line, err)
		}
		procIn <- rec
	}

	close(procIn)
	procWG.Wait()
	close(procOut)
	flog.V(1).Infof("processed %d lines in %v", linesProcessed, time.Since(start))
	return done
}