internal/gcsx/temp_file.go (226 lines of code) (raw):

// Copyright 2015 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package gcsx import ( "fmt" "io" "math" "os" "time" "github.com/jacobsa/fuse/fsutil" "github.com/jacobsa/timeutil" ) // TempFile is a temporary file that keeps track of the lowest offset at which // it has been modified. // // Not safe for concurrent access. type TempFile interface { // Panic if any internal invariants are violated. CheckInvariants() // Semantics matching os.File. io.ReadSeeker io.ReaderAt io.WriterAt Truncate(n int64) (err error) // Retrieve the file name Name() string // Return information about the current state of the content. May invalidate // the seek position. Stat() (sr StatResult, err error) // Explicitly set the mtime that will return in stat results. This will stick // until another method that modifies the file is called. SetMtime(mtime time.Time) // Throw away the resources used by the temporary file. The object must not // be used again. Destroy() } // StatResult stores the result of a stat operation. type StatResult struct { // The current size in bytes of the content. Size int64 // The largest value T such that we are sure that the range of bytes [0, T) // is unmodified from the original content with which the temp file was // created. DirtyThreshold int64 // The mtime of the temp file is updated according to the temp file's clock // with each call to a method that modified its content, and is also updated // when the user explicitly calls SetMtime. // // If neither of those things has ever happened, it is nil. This implies that // DirtyThreshold == Size. Mtime *time.Time } // NewTempFile creates a temp file whose initial contents are given by the // supplied reader. dir is a directory on whose file system the inode will live, // or the system default temporary location if empty. func NewTempFile( source io.ReadCloser, dir string, clock timeutil.Clock) (tf TempFile, err error) { // Create an anonymous file to wrap. When we close it, its resources will be // magically cleaned up. f, err := fsutil.AnonymousFile(dir) if err != nil { err = fmt.Errorf("AnonymousFile: %w", err) return } tf = &tempFile{ source: source, state: fileIncomplete, clock: clock, f: f, dirtyThreshold: 0, } return } // NewCacheFile creates a wrapper temp file whose initial contents are given by the // supplied source. dir is a directory on whose file system the file will live, // or the system default temporary location if empty. func NewCacheFile( source io.ReadCloser, f *os.File, dir string, clock timeutil.Clock) (tf TempFile) { tf = &tempFile{ source: source, state: fileIncomplete, clock: clock, f: f, dirtyThreshold: 0, } return } func RecoverCacheFile( source *os.File, dir string, clock timeutil.Clock) (tf TempFile, err error) { stat, err := source.Stat() if err != nil { return nil, fmt.Errorf("could not retrieve file stat: %w", err) } tf = &tempFile{ source: source, state: fileComplete, clock: clock, f: source, dirtyThreshold: stat.Size(), } return } type fileState string const ( fileIncomplete fileState = "fileIncomplete" fileComplete = "fileComplete" fileDirty = "fileDirty" fileDestroyed = "fileDestroyed" ) type tempFile struct { ///////////////////////// // Dependencies ///////////////////////// clock timeutil.Clock source io.ReadCloser ///////////////////////// // Mutable state ///////////////////////// state fileState // A file containing our current contents. f *os.File // The lowest byte index that has been modified from the initial contents. // // INVARIANT: Stat().DirtyThreshold <= Stat().Size dirtyThreshold int64 // The time at which a method that modifies our contents was last called, or // nil if never. // // INVARIANT: mtime == nil => Stat().DirtyThreshold == Stat().Size mtime *time.Time } //////////////////////////////////////////////////////////////////////// // Public interface //////////////////////////////////////////////////////////////////////// func (tf *tempFile) CheckInvariants() { if tf.state == fileDestroyed { panic("Use of destroyed tempFile object.") } // Restore the seek position after using Stat below. pos, err := tf.Seek(0, 1) if err != nil { panic(fmt.Errorf("seek: %w", err)) } defer func() { _, err := tf.Seek(pos, 0) if err != nil { panic(fmt.Errorf("seek: %w", err)) } }() // INVARIANT: Stat().DirtyThreshold <= Stat().Size sr, err := tf.Stat() if err != nil { panic(fmt.Errorf("stat: %w", err)) } if !(sr.DirtyThreshold <= sr.Size) { panic(fmt.Errorf("mismatch: %d vs. %d", sr.DirtyThreshold, sr.Size)) } // INVARIANT: mtime == nil => Stat().DirtyThreshold == Stat().Size if tf.mtime == nil && sr.DirtyThreshold != sr.Size { panic(fmt.Errorf("mismatch: %d vs. %d", sr.DirtyThreshold, sr.Size)) } } func (tf *tempFile) Destroy() { tf.state = fileDestroyed // Throw away the file (for anonymous files). tf.f.Close() tf.f = nil } func (tf *tempFile) Read(p []byte) (int, error) { err := tf.ensureComplete() if err != nil { return 0, fmt.Errorf("cannot Read incomplete file: %w", err) } return tf.f.Read(p) } func (tf *tempFile) Seek(offset int64, whence int) (int64, error) { err := tf.ensureComplete() if err != nil { return 0, fmt.Errorf("cannot Seek incomplete file: %w", err) } return tf.f.Seek(offset, whence) } func (tf *tempFile) ReadAt(p []byte, offset int64) (int, error) { err := tf.ensureComplete() if err != nil { return 0, fmt.Errorf("cannot ReadAt incomplete file: %w", err) } return tf.f.ReadAt(p, offset) } func (tf *tempFile) Stat() (sr StatResult, err error) { err = tf.ensureComplete() if err != nil { err = fmt.Errorf("cannot Stat incomplete file: %w", err) return } sr.DirtyThreshold = tf.dirtyThreshold sr.Mtime = tf.mtime // Get the size from the file. sr.Size, err = tf.f.Seek(0, 2) if err != nil { err = fmt.Errorf("seek: %w", err) return } return } func (tf *tempFile) WriteAt(p []byte, offset int64) (int, error) { err := tf.ensureComplete() if err != nil { return 0, fmt.Errorf("cannot WriteAt incomplete file: %w", err) } // Update our state regarding being dirty. tf.dirtyThreshold = minInt64(tf.dirtyThreshold, offset) tf.state = fileDirty newMtime := tf.clock.Now() tf.mtime = &newMtime // Call through. return tf.f.WriteAt(p, offset) } func (tf *tempFile) Truncate(n int64) error { err := tf.ensureComplete() if err != nil { return fmt.Errorf("cannot Truncate incomplete file: %w", err) } // Update our state regarding being dirty. tf.dirtyThreshold = minInt64(tf.dirtyThreshold, n) tf.state = fileDirty newMtime := tf.clock.Now() tf.mtime = &newMtime // Call through. return tf.f.Truncate(n) } func (tf *tempFile) SetMtime(mtime time.Time) { tf.mtime = &mtime } func (tf *tempFile) Name() string { return tf.f.Name() } //////////////////////////////////////////////////////////////////////// // Helpers //////////////////////////////////////////////////////////////////////// func minInt64(a int64, b int64) int64 { if a < b { return a } return b } const ( minCopyLength = 64 * 1024 * 1024 // 64 MiB ) func (tf *tempFile) ensure(limit int64) error { switch tf.state { case fileIncomplete: size, err := tf.f.Seek(0, 2) if size >= limit { return nil } n := limit - size if n < minCopyLength { n = minCopyLength } n, err = io.CopyN(tf.f, tf.source, n) if err == io.EOF { tf.source.Close() tf.dirtyThreshold = size + n tf.state = fileComplete err = nil } return err case fileComplete, fileDirty: // already completed return nil case fileDestroyed: return fmt.Errorf("file destroyed") } return nil } func (tf *tempFile) ensureComplete() error { err := tf.ensure(math.MaxInt64) if err != nil { err = fmt.Errorf("load temp file: %w", err) } return err }