pkg/fs/local_file_system.go (497 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package fs (file system) is an independent component to operate file and directory.
package fs
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/shirou/gopsutil/v3/disk"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
const defaultIOSize = 256 * 1024
// localFileSystem implements the File System interface.
type localFileSystem struct {
logger *logger.Logger
}
// LocalFile implements the File interface.
type LocalFile struct {
file *os.File
}
// NewLocalFileSystem is used to create the Local File system.
func NewLocalFileSystem() FileSystem {
return &localFileSystem{
logger: logger.GetLogger(moduleName),
}
}
// NewLocalFileSystemWithLogger is used to create the Local File system with logger.
func NewLocalFileSystemWithLogger(parent *logger.Logger) FileSystem {
return &localFileSystem{
logger: parent.Named(moduleName),
}
}
func readErrorHandle(operation string, err error, name string, size int) (int, error) {
switch {
case errors.Is(err, io.EOF):
return size, err
case os.IsNotExist(err):
return size, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("%s failed, file is not exist, file name: %s, error message: %s", operation, name, err),
}
case os.IsPermission(err):
return size, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("%s failed, there is not enough permission, file name: %s, error message: %s", operation, name, err),
}
default:
return size, &FileSystemError{
Code: readError,
Message: fmt.Sprintf("%s failed, read file error, file name: %s, read file size: %d, error message: %s", operation, name, size, err),
}
}
}
func (fs *localFileSystem) MkdirIfNotExist(path string, permission Mode) {
if fs.pathExist(path) {
return
}
fs.mkdir(path, permission)
}
func (fs *localFileSystem) MkdirPanicIfExist(path string, permission Mode) {
if fs.pathExist(path) {
fs.logger.Panic().Str("path", path).Msg("directory is exist")
}
fs.mkdir(path, permission)
}
func (fs *localFileSystem) mkdir(path string, permission Mode) {
if err := os.MkdirAll(path, os.FileMode(permission)); err != nil {
fs.logger.Panic().Str("path", path).Err(err).Msg("failed to create directory")
}
parentDirPath := filepath.Dir(path)
fs.SyncPath(parentDirPath)
}
func (fs *localFileSystem) pathExist(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
fs.logger.Panic().Str("path", path).Err(err).Msg("failed to stat path")
}
return true
}
func (fs *localFileSystem) ReadDir(dirname string) []DirEntry {
des, err := os.ReadDir(dirname)
if err != nil {
fs.logger.Panic().Str("dirname", dirname).Err(err).Msg("failed to read directory")
}
result := make([]DirEntry, len(des))
for i, de := range des {
result[i] = DirEntry(de)
}
return result
}
// CreateFile is used to create and open the file by specified name and mode.
func (fs *localFileSystem) CreateFile(name string, permission Mode) (File, error) {
file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission))
switch {
case err == nil:
return &LocalFile{
file: file,
}, nil
case os.IsExist(err):
return nil, &FileSystemError{
Code: isExistError,
Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err),
}
case os.IsPermission(err):
return nil, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err),
}
default:
return nil, &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err),
}
}
}
func (fs *localFileSystem) OpenFile(name string) (File, error) {
file, err := os.Open(name)
switch {
case err == nil:
return &LocalFile{
file: file,
}, nil
case os.IsNotExist(err):
return nil, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s,error message: %s", name, err),
}
case os.IsPermission(err):
return nil, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", name, err),
}
default:
return nil, &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err),
}
}
}
// Write flushes all data to one file.
func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) (int, error) {
file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, os.FileMode(permission))
if err != nil {
switch {
case os.IsExist(err):
return 0, &FileSystemError{
Code: isExistError,
Message: fmt.Sprintf("File is exist, file name: %s,error message: %s", name, err),
}
case os.IsPermission(err):
return 0, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, permission: %d,error message: %s", name, permission, err),
}
default:
return 0, &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Create file return error, file name: %s,error message: %s", name, err),
}
}
}
defer file.Close()
size, err := file.Write(buffer)
if err != nil {
return size, &FileSystemError{
Code: flushError,
Message: fmt.Sprintf("Flush file return error, file name: %s,error message: %s", name, err),
}
}
return size, nil
}
// Read is used to read the entire file using streaming read.
func (fs *localFileSystem) Read(name string) ([]byte, error) {
data, err := os.ReadFile(name)
switch {
case err == nil:
return data, nil
case os.IsNotExist(err):
return data, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", name, err),
}
case os.IsPermission(err):
return data, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", name, err),
}
default:
return data, &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Read file error, file name: %s, error message: %s", name, err),
}
}
}
// DeleteFile is used to delete the file.
func (fs *localFileSystem) DeleteFile(name string) error {
err := os.Remove(name)
switch {
case err == nil:
return nil
case os.IsNotExist(err):
return &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", name, err),
}
case os.IsPermission(err):
return &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", name, err),
}
default:
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Delete file error, file name: %s, error message: %s", name, err),
}
}
}
func (fs *localFileSystem) MustRMAll(path string) {
if err := os.RemoveAll(path); err == nil {
return
}
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if err := os.RemoveAll(path); err == nil {
return
}
}
fs.logger.Panic().Str("path", path).Msg("failed to remove all files under path")
}
func (fs *localFileSystem) MustGetFreeSpace(path string) uint64 {
usage, err := disk.Usage(path)
if err != nil {
fs.logger.Panic().Str("path", path).Err(err).Msg("failed to get disk usage")
}
return usage.Free
}
func (fs *localFileSystem) CreateHardLink(srcPath, destPath string, filter func(string) bool) error {
fi, err := os.Stat(srcPath)
if err != nil {
return &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("Source path does not exist: %s, error: %v", srcPath, err),
}
}
if !fi.IsDir() {
if err = os.Link(srcPath, destPath); err != nil {
code := otherError
if os.IsExist(err) {
code = isExistError
} else if os.IsPermission(err) {
code = permissionError
}
return &FileSystemError{
Code: code,
Message: fmt.Sprintf("Failed to create hard link from %s to %s: %v", srcPath, destPath, err),
}
}
return nil
}
err = filepath.Walk(srcPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Error accessing path %s: %v", path, err),
}
}
if path == srcPath {
if info.IsDir() {
if err = os.MkdirAll(destPath, info.Mode()); err != nil {
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Failed to create destination root directory %s: %v", destPath, err),
}
}
}
return nil
}
relPath, err := filepath.Rel(srcPath, path)
if err != nil {
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Failed to get relative path for %s: %v", path, err),
}
}
destFullPath := filepath.Join(destPath, relPath)
if info.IsDir() {
if err := os.MkdirAll(destFullPath, info.Mode()); err != nil {
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Failed to create directory %s: %v", destFullPath, err),
}
}
return nil
}
if filter != nil && !filter(path) {
return nil
}
parentDir := filepath.Dir(destFullPath)
if err := os.MkdirAll(parentDir, 0o755); err != nil {
return &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Failed to create parent directory %s: %v", parentDir, err),
}
}
if err := os.Link(path, destFullPath); err != nil {
code := otherError
if os.IsExist(err) {
code = isExistError
} else if os.IsPermission(err) {
code = permissionError
}
return &FileSystemError{
Code: code,
Message: fmt.Sprintf("Failed to create hard link from %s to %s: %v", path, destFullPath, err),
}
}
return nil
})
if err != nil {
return err
}
fs.SyncPath(destPath)
return nil
}
// Write adds new data to the end of a file.
func (file *LocalFile) Write(buffer []byte) (int, error) {
size, err := file.file.Write(buffer)
switch {
case err == nil:
return size, nil
case os.IsNotExist(err):
return size, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err),
}
case os.IsPermission(err):
return size, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err),
}
default:
// include io.ErrShortWrite
return size, &FileSystemError{
Code: writeError,
Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err),
}
}
}
// Writev supports appending consecutive buffers to the end of the file.
func (file *LocalFile) Writev(iov *[][]byte) (int, error) {
var size int
for _, buffer := range *iov {
wsize, err := file.file.Write(buffer)
switch {
case err == nil:
size += wsize
case os.IsNotExist(err):
return size, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err),
}
case os.IsPermission(err):
return size, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err),
}
default:
// include io.ErrShortWrite
return size, &FileSystemError{
Code: writeError,
Message: fmt.Sprintf("Write file error, file name: %s, error message: %s", file.file.Name(), err),
}
}
}
return size, nil
}
// SequentialWrite supports appending consecutive buffers to the end of the file.
func (file *LocalFile) SequentialWrite() SeqWriter {
writer := generateWriter(file.file)
return &seqWriter{writer: writer, fileName: file.file.Name()}
}
// Read is used to read a specified location of file.
func (file *LocalFile) Read(offset int64, buffer []byte) (int, error) {
rsize, err := file.file.ReadAt(buffer, offset)
if err != nil {
return readErrorHandle("Read operation", err, file.file.Name(), rsize)
}
return rsize, nil
}
// Readv is used to read contiguous regions of a file and disperse them into discontinuous buffers.
func (file *LocalFile) Readv(offset int64, iov *[][]byte) (int, error) {
var size int
for _, buffer := range *iov {
rsize, err := file.file.ReadAt(buffer, offset)
if err != nil {
return readErrorHandle("Readv operation", err, file.file.Name(), rsize)
}
size += rsize
offset += int64(rsize)
}
return size, nil
}
// SequentialRead is used to read the entire file using streaming read.
func (file *LocalFile) SequentialRead() SeqReader {
reader := generateReader(file.file)
return &seqReader{reader: reader, fileName: file.file.Name()}
}
// Size is used to get the file written data's size and return an error if the file does not exist. The unit of file size is Byte.
func (file *LocalFile) Size() (int64, error) {
fileInfo, err := os.Stat(file.file.Name())
if err != nil {
if os.IsNotExist(err) {
return -1, &FileSystemError{
Code: IsNotExistError,
Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err),
}
} else if os.IsPermission(err) {
return -1, &FileSystemError{
Code: permissionError,
Message: fmt.Sprintf("There is not enough permission, file name: %s, error message: %s", file.file.Name(), err),
}
}
return -1, &FileSystemError{
Code: otherError,
Message: fmt.Sprintf("Get file size error, file name: %s, error message: %s", file.file.Name(), err),
}
}
return fileInfo.Size(), nil
}
// Path returns the absolute path of the file.
func (file *LocalFile) Path() string {
return file.file.Name()
}
// Close is used to close File.
func (file *LocalFile) Close() error {
if err := syncFile(file.file); err != nil {
return err
}
if err := file.file.Close(); err != nil {
return &FileSystemError{
Code: closeError,
Message: fmt.Sprintf("Close File error, directory name: %s, error message: %s", file.file.Name(), err),
}
}
return nil
}
type seqReader struct {
reader *bufio.Reader
fileName string
}
func (i *seqReader) Read(p []byte) (int, error) {
rsize, err := i.reader.Read(p)
if err != nil {
return readErrorHandle("ReadStream operation", err, i.fileName, rsize)
}
return rsize, nil
}
func (i *seqReader) Path() string {
return i.fileName
}
func (i *seqReader) Close() error {
releaseReader(i.reader)
return nil
}
type seqWriter struct {
writer *bufio.Writer
fileName string
}
func (w *seqWriter) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}
func (w *seqWriter) Path() string {
return w.fileName
}
func (w *seqWriter) Close() error {
defer releaseWriter(w.writer)
if err := w.writer.Flush(); err != nil {
return &FileSystemError{
Code: closeError,
Message: fmt.Sprintf("Flush File error, directory name: %s, error message: %s", w.fileName, err),
}
}
return nil
}
func generateReader(f *os.File) *bufio.Reader {
v := bufReaderPool.Get()
if v == nil {
return bufio.NewReaderSize(f, defaultIOSize)
}
br := v
br.Reset(f)
return br
}
func releaseReader(br *bufio.Reader) {
br.Reset(nil)
bufReaderPool.Put(br)
}
var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader")
func generateWriter(f *os.File) *bufio.Writer {
v := bufWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(f, defaultIOSize)
}
bw := v
bw.Reset(f)
return bw
}
func releaseWriter(bw *bufio.Writer) {
bw.Reset(nil)
bufWriterPool.Put(bw)
}
var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter")