cmd/wfnconvert/wfnconvert.go (183 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bufio"
"context"
"encoding/csv"
"flag"
"fmt"
"io"
"os"
"github.com/facebookincubator/flog"
"golang.org/x/sync/errgroup"
)
func main() {
var o options
o.addFlags()
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "wfnconvert provides various manipulations to WFN attributes")
fmt.Fprintln(os.Stderr, "usage: wfnconvert [options]")
fmt.Fprintln(os.Stderr, "options:")
flag.PrintDefaults()
os.Exit(1)
}
flag.Parse()
if err := o.validate(); err != nil {
fmt.Fprintf(os.Stderr, "wfnconvert: %v\n\n", err)
flag.Usage()
}
if err := wfnconvert(os.Stdin, os.Stdout, &o); err != nil {
flog.Fatalf("wfnconvert: %v\n", err)
}
}
func wfnconvert(in io.Reader, out io.Writer, o *options) error {
inCh := make(chan []string)
outCh := make(chan []string)
g, ctx := errgroup.WithContext(context.Background())
var inFunc, procFunc, outFunc func() error
if len(o.csvFields) == 0 {
inFunc = readLines(ctx, in, inCh)
procFunc = processLines(ctx, o, inCh, outCh)
outFunc = writeLines(out, outCh)
} else {
inFunc = readCSV(ctx, in, o.csvComma, inCh)
procFunc = processCSV(ctx, o, inCh, outCh)
outFunc = writeCSV(out, o.csvComma, outCh)
}
g.Go(func() error {
defer close(inCh)
return inFunc()
})
g.Go(func() error {
defer close(outCh)
return procFunc()
})
g.Go(outFunc)
return g.Wait()
}
func readLines(ctx context.Context, in io.Reader, ch chan<- []string) func() error {
scanner := bufio.NewScanner(in)
return func() error {
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- []string{scanner.Text()}:
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("input error: %v", err)
}
return nil
}
}
func readCSV(ctx context.Context, in io.Reader, comma string, ch chan<- []string) func() error {
r := csv.NewReader(in)
r.Comma = rune(comma[0])
return func() error {
for {
ss, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("input error: %v", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case ch <- ss:
}
}
return nil
}
}
func writeLines(out io.Writer, ch <-chan []string) func() error {
return func() error {
for ss := range ch {
if len(ss) > 0 {
fmt.Fprintln(out, ss[0])
}
}
return nil
}
}
func writeCSV(out io.Writer, comma string, ch <-chan []string) func() error {
w := csv.NewWriter(out)
w.Comma = rune(comma[0])
return func() error {
for ss := range ch {
if err := w.Write(ss); err != nil {
return fmt.Errorf("output error: %v", err)
}
}
w.Flush()
if err := w.Error(); err != nil {
return fmt.Errorf("output error: %v", err)
}
return nil
}
}
func processLines(ctx context.Context, o *options, in <-chan []string, out chan<- []string) func() error {
return func() error {
var (
ss []string
more bool
)
for {
select {
case ss, more = <-in:
if !more {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
if len(ss) == 0 {
continue
}
s, err := process(ss[0], o)
if err != nil {
return fmt.Errorf("could not process %q: %v", ss[0], err)
}
ss[0] = s
select {
case out <- ss:
case <-ctx.Done():
return ctx.Err()
}
}
}
}
func processCSV(ctx context.Context, o *options, in <-chan []string, out chan<- []string) func() error {
return func() error {
var (
ss []string
more bool
)
for {
select {
case ss, more = <-in:
if !more {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
for _, n := range o.csvFields {
if n >= len(ss) {
continue
}
s, err := process(ss[n], o)
if err != nil {
return fmt.Errorf("could not process %q: %v", ss[n], err)
}
ss[n] = s
}
select {
case out <- ss:
case <-ctx.Done():
return ctx.Err()
}
}
}
}