func()

in scripts/go/curconvert/curconvert.go [393:465]


func (c *CurConvert) ParquetCur(inputFile string) (string, error) {

	// open input
	file, err := os.Open(inputFile)
	if err != nil {
		return "", err
	}
	defer file.Close()

	// init gzip library on input
	gr, err := gzip.NewReader(file)
	if err != nil {
		log.Fatal(err)
	}
	defer gr.Close()

	// init csv reader
	cr := csv.NewReader(gr)

	// read and ignore header record
	_, err = cr.Read()
	if err != nil {
		log.Fatal(err)
	}

	// create local parquet file
	localParquetFile := c.tempDir + "/" + inputFile[strings.LastIndex(inputFile, "/")+1:strings.Index(inputFile, ".")] + ".parquet"
	f, err := ParquetFile.NewLocalFileWriter(localParquetFile)
	if err != nil {
		return "", fmt.Errorf("failed to create parquet file %s, error: %s", localParquetFile, err.Error())
	}

	// init Parquet writer
	ph, err := ParquetWriter.NewCSVWriter(c.CurColumns, f, int64(c.concurrency))
	if err != nil {
		return "", err
	}

	// read all remaining records of CSV file and write to parquet
	i := 1
	for {
		if i%5000 == 0 {
			ph.Flush(true)
			i = 1
		}

		rec, err := cr.Read()
		if err == io.EOF {
			break
		}
		if err != nil {
			return "", err
		}

		var recParquet []*string
		for k := range rec {
			_, skip := c.skipCols[k]
			if !skip {
				recParquet = append(recParquet, &rec[k])
			}
		}
		ph.WriteString(recParquet)
		i++
	}

	if i > 1 {
		ph.Flush(true)
	}
	ph.WriteStop()
	f.Close()

	return localParquetFile, nil
}