in scripts/go/curconvert/curconvert.go [393:465]
func (c *CurConvert) ParquetCur(inputFile string) (string, error) {
// open input
file, err := os.Open(inputFile)
if err != nil {
return "", err
}
defer file.Close()
// init gzip library on input
gr, err := gzip.NewReader(file)
if err != nil {
log.Fatal(err)
}
defer gr.Close()
// init csv reader
cr := csv.NewReader(gr)
// read and ignore header record
_, err = cr.Read()
if err != nil {
log.Fatal(err)
}
// create local parquet file
localParquetFile := c.tempDir + "/" + inputFile[strings.LastIndex(inputFile, "/")+1:strings.Index(inputFile, ".")] + ".parquet"
f, err := ParquetFile.NewLocalFileWriter(localParquetFile)
if err != nil {
return "", fmt.Errorf("failed to create parquet file %s, error: %s", localParquetFile, err.Error())
}
// init Parquet writer
ph, err := ParquetWriter.NewCSVWriter(c.CurColumns, f, int64(c.concurrency))
if err != nil {
return "", err
}
// read all remaining records of CSV file and write to parquet
i := 1
for {
if i%5000 == 0 {
ph.Flush(true)
i = 1
}
rec, err := cr.Read()
if err == io.EOF {
break
}
if err != nil {
return "", err
}
var recParquet []*string
for k := range rec {
_, skip := c.skipCols[k]
if !skip {
recParquet = append(recParquet, &rec[k])
}
}
ph.WriteString(recParquet)
i++
}
if i > 1 {
ph.Flush(true)
}
ph.WriteStop()
f.Close()
return localParquetFile, nil
}