io/io.go (167 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package io import ( "context" "errors" "fmt" "io" "io/fs" "net/url" "strings" "gocloud.dev/blob" "gocloud.dev/blob/memblob" ) // IO is an interface to a hierarchical file system. // // The IO interface is the minimum implementation required for a file // system to utilize an iceberg table. A file system may implement // additional interfaces, such as ReadFileIO, to provide additional or // optimized functionality. type IO interface { // Open opens the named file. // // When Open returns an error, it should be of type *PathError // with the Op field set to "open", the Path field set to name, // and the Err field describing the problem. // // Open should reject attempts to open names that do not satisfy // fs.ValidPath(name), returning a *PathError with Err set to // ErrInvalid or ErrNotExist. Open(name string) (File, error) // Remove removes the named file or (empty) directory. // // If there is an error, it will be of type *PathError. Remove(name string) error } // ReadFileIO is the interface implemented by a file system that // provides an optimized implementation of ReadFile. type ReadFileIO interface { IO // ReadFile reads the named file and returns its contents. // A successful call returns a nil error, not io.EOF. // (Because ReadFile reads the whole file, the expected EOF // from the final Read is not treated as an error to be reported.) // // The caller is permitted to modify the returned byte slice. // This method should return a copy of the underlying data. ReadFile(name string) ([]byte, error) } // WriteFileIO is the interface implemented by a file system that // provides an optimized implementation of WriteFile type WriteFileIO interface { IO // Create attempts to create the named file and return a writer // for it. Create(name string) (FileWriter, error) // WriteFile writes p to the named file. WriteFile(name string, p []byte) error } // A File provides access to a single file. The File interface is the // minimum implementation required for Iceberg to interact with a file. // Directory files should also implement type File interface { fs.File io.ReadSeekCloser io.ReaderAt } // A FileWriter represents an open writable file. type FileWriter interface { io.WriteCloser io.ReaderFrom } // A ReadDirFile is a directory file whose entries can be read with the // ReadDir method. Every directory file should implement this interface. // (It is permissible for any file to implement this interface, but // if so ReadDir should return an error for non-directories.) type ReadDirFile interface { File // ReadDir read the contents of the directory and returns a slice // of up to n DirEntry values in directory order. Subsequent calls // on the same file will yield further DirEntry values. // // If n > 0, ReadDir returns at most n DirEntry structures. In this // case, if ReadDir returns an empty slice, it will return a non-nil // error explaining why. // // At the end of a directory, the error is io.EOF. (ReadDir must return // io.EOF itself, not an error wrapping io.EOF.) // // If n <= 0, ReadDir returns all the DirEntry values from the directory // in a single slice. In this case, if ReadDir succeeds (reads all the way // to the end of the directory), it returns the slice and a nil error. // If it encounters an error before the end of the directory, ReadDir // returns the DirEntry list read until that point and a non-nil error. ReadDir(n int) ([]fs.DirEntry, error) } // FS wraps an io/fs.FS as an IO interface. func FS(fsys fs.FS) IO { if _, ok := fsys.(fs.ReadFileFS); ok { return readFileFS{ioFS{fsys, nil}} } return ioFS{fsys, nil} } // FSPreProcName wraps an io/fs.FS like FS, only if fn is non-nil then // it is called to preprocess any filenames before they are passed to // the underlying fsys. func FSPreProcName(fsys fs.FS, fn func(string) string) IO { if _, ok := fsys.(fs.ReadFileFS); ok { return readFileFS{ioFS{fsys, fn}} } return ioFS{fsys, fn} } type readFileFS struct { ioFS } func (r readFileFS) ReadFile(name string) ([]byte, error) { if r.preProcessName != nil { name = r.preProcessName(name) } rfs, ok := r.fsys.(fs.ReadFileFS) if !ok { return nil, errMissingReadFile } return rfs.ReadFile(name) } type ioFS struct { fsys fs.FS preProcessName func(string) string } func (f ioFS) Open(name string) (File, error) { if f.preProcessName != nil { name = f.preProcessName(name) } if name == "/" { name = "." } else { name = strings.TrimPrefix(name, "/") } file, err := f.fsys.Open(name) if err != nil { return nil, err } return ioFile{file}, nil } func (f ioFS) Remove(name string) error { r, ok := f.fsys.(interface{ Remove(name string) error }) if !ok { return errMissingRemove } return r.Remove(name) } var ( errMissingReadDir = errors.New("fs.File directory missing ReadDir method") errMissingSeek = errors.New("fs.File missing Seek method") errMissingReadAt = errors.New("fs.File missing ReadAt") errMissingRemove = errors.New("fs.FS missing Remove method") errMissingReadFile = errors.New("fs.FS missing ReadFile method") ) type ioFile struct { file fs.File } func (f ioFile) Close() error { return f.file.Close() } func (f ioFile) Read(b []byte) (int, error) { return f.file.Read(b) } func (f ioFile) Stat() (fs.FileInfo, error) { return f.file.Stat() } func (f ioFile) Seek(offset int64, whence int) (int64, error) { s, ok := f.file.(io.Seeker) if !ok { return 0, errMissingSeek } return s.Seek(offset, whence) } func (f ioFile) ReadAt(p []byte, off int64) (n int, err error) { r, ok := f.file.(io.ReaderAt) if !ok { return 0, errMissingReadAt } return r.ReadAt(p, off) } func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) { d, ok := f.file.(fs.ReadDirFile) if !ok { return nil, errMissingReadDir } return d.ReadDir(count) } func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) { parsed, err := url.Parse(path) if err != nil { return nil, err } var bucket *blob.Bucket switch parsed.Scheme { case "s3", "s3a", "s3n": bucket, err = createS3Bucket(ctx, parsed, props) if err != nil { return nil, err } case "gs": bucket, err = createGCSBucket(ctx, parsed, props) if err != nil { return nil, err } case "mem": // memblob doesn't use the URL host or path bucket = memblob.OpenBucket(nil) case "file", "": return LocalFS{}, nil case "abfs", "abfss", "wasb", "wasbs": bucket, err = createAzureBucket(ctx, parsed, props) if err != nil { return nil, err } default: return nil, fmt.Errorf("IO for file '%s' not implemented", path) } return createBlobFS(ctx, bucket, parsed.Host), nil } // LoadFS takes a map of properties and an optional URI location // and attempts to infer an IO object from it. // // A schema of "file://" or an empty string will result in a LocalFS // implementation. Otherwise this will return an error if the schema // does not yet have an implementation here. // // Currently local, S3, GCS, and In-Memory FSs are implemented. func LoadFS(ctx context.Context, props map[string]string, location string) (IO, error) { if location == "" { location = props["warehouse"] } iofs, err := inferFileIOFromSchema(ctx, location, props) if err != nil { return nil, err } if iofs == nil { iofs = LocalFS{} } return iofs, nil }