func NewFileReader()

in reader/reader.go [45:108]


func NewFileReader(filePaths string, batchRows int, batchBytes int, bufferSize int, queues *[]chan []byte, pool *sync.Pool, size *int64) *FileReader {
	var files []*os.File

	// parse path
	paths := strings.Split(filePaths, ",")
	for _, filePath := range paths {
		matches, err := filepath.Glob(filePath)
		if len(matches) == 0 {
			log.Errorf("There is no file, file path: %s", filePath)
			os.Exit(1)
		}
		if err != nil {
			log.Errorf("Source file pattern match error, error message : %v", err)
			os.Exit(1)
		}

		for _, match := range matches {
			fileInfo, err := os.Stat(match)
			if err != nil {
				log.Errorf("Get file info error, error message : %v", err)
				os.Exit(1)
			}

			if fileInfo.IsDir() {
				err = filepath.Walk(match, func(path string, info os.FileInfo, err error) error {
					if err != nil {
						return err
					}

					if !info.IsDir() {
						file, err := os.Open(path)
						if err != nil {
							return err
						}
						files = append(files, file)
						*size += info.Size()
					}
					return nil
				})
				if err != nil {
					log.Errorf("Failed to traverse directory tree, error message : %v", err)
					os.Exit(1)
				}
			} else {
				file, err := os.Open(match)
				if err != nil {
					log.Errorf("Failed to open file, error message : %v", err)
					os.Exit(1)
				}
				files = append(files, file)
				*size += fileInfo.Size()
			}
		}
	}

	return &FileReader{
		batchRows:  batchRows,
		batchBytes: batchBytes,
		bufferSize: bufferSize,
		queues:     queues,
		files:      files,
		pool:       pool,
	}
}