in reader/reader.go [45:108]
func NewFileReader(filePaths string, batchRows int, batchBytes int, bufferSize int, queues *[]chan []byte, pool *sync.Pool, size *int64) *FileReader {
var files []*os.File
// parse path
paths := strings.Split(filePaths, ",")
for _, filePath := range paths {
matches, err := filepath.Glob(filePath)
if len(matches) == 0 {
log.Errorf("There is no file, file path: %s", filePath)
os.Exit(1)
}
if err != nil {
log.Errorf("Source file pattern match error, error message : %v", err)
os.Exit(1)
}
for _, match := range matches {
fileInfo, err := os.Stat(match)
if err != nil {
log.Errorf("Get file info error, error message : %v", err)
os.Exit(1)
}
if fileInfo.IsDir() {
err = filepath.Walk(match, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
file, err := os.Open(path)
if err != nil {
return err
}
files = append(files, file)
*size += info.Size()
}
return nil
})
if err != nil {
log.Errorf("Failed to traverse directory tree, error message : %v", err)
os.Exit(1)
}
} else {
file, err := os.Open(match)
if err != nil {
log.Errorf("Failed to open file, error message : %v", err)
os.Exit(1)
}
files = append(files, file)
*size += fileInfo.Size()
}
}
}
return &FileReader{
batchRows: batchRows,
batchBytes: batchBytes,
bufferSize: bufferSize,
queues: queues,
files: files,
pool: pool,
}
}