internal/fs/inode/file.go (624 lines of code) (raw):
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inode
import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"syscall"
"time"
"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/bufferedwrites"
"github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/syncutil"
"github.com/jacobsa/timeutil"
"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)
// A GCS object metadata key for file mtimes. mtimes are UTC, and are stored in
// the format defined by time.RFC3339Nano.
const FileMtimeMetadataKey = gcs.MtimeMetadataKey
type FileInode struct {
/////////////////////////
// Dependencies
/////////////////////////
bucket *gcsx.SyncerBucket
mtimeClock timeutil.Clock
/////////////////////////
// Constant data
/////////////////////////
id fuseops.InodeID
name Name
attrs fuseops.InodeAttributes
contentCache *contentcache.ContentCache
// TODO (#640) remove bool flag and refactor contentCache to support two implementations:
// one implementation with original functionality and one with new persistent disk content cache
localFileCache bool
/////////////////////////
// Mutable state
/////////////////////////
// A mutex that must be held when calling certain methods. See documentation
// for each method.
mu syncutil.InvariantMutex
// GUARDED_BY(mu)
lc lookupCount
// The source object from which this inode derives.
//
// INVARIANT: for non local files, src.Name == name.GcsObjectName()
//
// GUARDED_BY(mu)
src gcs.MinObject
// The current content of this inode, or nil if the source object is still
// authoritative.
content gcsx.TempFile
// Has Destroy been called?
//
// GUARDED_BY(mu)
destroyed bool
// Represents a local file which is not yet synced to GCS.
local bool
// Represents if local file has been unlinked.
unlinked bool
// Wrapper object for multi range downloader. Needed as we will create the MRD in
// random reader and we can't pass fileInode object to random reader as it
// creates a cyclic dependency.
// Todo: Investigate if cyclic dependency can be removed by removing some unused
// code.
MRDWrapper gcsx.MultiRangeDownloaderWrapper
bwh bufferedwrites.BufferedWriteHandler
config *cfg.Config
// Once write is started on the file i.e, bwh is initialized, any fileHandles
// opened in write mode before or after this and not yet closed are considered
// as writing to the file even though they are not writing.
// In case of successful flush, we will set bwh to nil. But in case of error,
// we will keep returning that error to all the fileHandles open during that time
// and set bwh to nil after all fileHandlers are closed.
// writeHandleCount tracks the count of open fileHandles in write mode.
writeHandleCount int32
// Limits the max number of blocks that can be created across file system when
// streaming writes are enabled.
globalMaxWriteBlocksSem *semaphore.Weighted
}
var _ Inode = &FileInode{}
// Create a file inode for the given min object in GCS. The initial lookup count is
// zero.
//
// REQUIRES: m != nil
// REQUIRES: m.Generation > 0
// REQUIRES: m.MetaGeneration > 0
// REQUIRES: len(m.Name) > 0
// REQUIRES: m.Name[len(m.Name)-1] != '/'
func NewFileInode(
id fuseops.InodeID,
name Name,
m *gcs.MinObject,
attrs fuseops.InodeAttributes,
bucket *gcsx.SyncerBucket,
localFileCache bool,
contentCache *contentcache.ContentCache,
mtimeClock timeutil.Clock,
localFile bool,
cfg *cfg.Config,
globalMaxBlocksSem *semaphore.Weighted) (f *FileInode) {
// Set up the basic struct.
var minObj gcs.MinObject
if m != nil {
minObj = *m
}
f = &FileInode{
bucket: bucket,
mtimeClock: mtimeClock,
id: id,
name: name,
attrs: attrs,
localFileCache: localFileCache,
contentCache: contentCache,
src: minObj,
local: localFile,
unlinked: false,
config: cfg,
globalMaxWriteBlocksSem: globalMaxBlocksSem,
}
var err error
f.MRDWrapper, err = gcsx.NewMultiRangeDownloaderWrapper(bucket, &minObj)
if err != nil {
logger.Errorf("NewFileInode: Error in creating MRDWrapper %v", err)
}
f.lc.Init(id)
// Set up invariant checking.
f.mu = syncutil.NewInvariantMutex(f.checkInvariants)
return
}
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) checkInvariants() {
if f.destroyed {
return
}
// Make sure the name is legal.
name := f.Name()
if !name.IsFile() {
panic("Illegal file name: " + name.String())
}
// INVARIANT: For non-local inodes, src.Name == name
if !f.IsLocal() && f.src.Name != name.GcsObjectName() {
panic(fmt.Sprintf(
"Name mismatch: %q vs. %q",
f.src.Name,
name.GcsObjectName(),
))
}
// INVARIANT: content.CheckInvariants() does not panic
if f.content != nil {
f.content.CheckInvariants()
}
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) clobbered(ctx context.Context, forceFetchFromGcs bool, includeExtendedObjectAttributes bool) (o *gcs.Object, b bool, err error) {
// Stat the object in GCS. ForceFetchFromGcs ensures object is fetched from
// gcs and not cache.
req := &gcs.StatObjectRequest{
Name: f.name.GcsObjectName(),
ForceFetchFromGcs: forceFetchFromGcs,
ReturnExtendedObjectAttributes: includeExtendedObjectAttributes,
}
m, e, err := f.bucket.StatObject(ctx, req)
if includeExtendedObjectAttributes {
o = storageutil.ConvertMinObjectAndExtendedObjectAttributesToObject(m, e)
} else {
o = storageutil.ConvertMinObjectToObject(m)
}
// Special case: "not found" means we have been clobbered.
var notFoundErr *gcs.NotFoundError
if errors.As(err, ¬FoundErr) {
err = nil
if f.IsLocal() {
// For localFile, it is expected that object doesn't exist in GCS.
return
}
b = true
return
}
// Propagate other errors.
if err != nil {
err = fmt.Errorf("StatObject: %w", err)
return
}
// We are clobbered iff the generation doesn't match our source generation.
oGen := Generation{o.Generation, o.MetaGeneration, o.Size}
b = oGen.Compare(f.SourceGeneration()) != 0
return
}
// Open a reader for the generation of object we care about.
func (f *FileInode) openReader(ctx context.Context) (io.ReadCloser, error) {
rc, err := f.bucket.NewReaderWithReadHandle(
ctx,
&gcs.ReadObjectRequest{
Name: f.src.Name,
Generation: f.src.Generation,
ReadCompressed: f.src.HasContentEncodingGzip(),
})
// If the object with requested generation doesn't exist in GCS, it indicates
// a file clobbering scenario. This likely occurred because the file was
// modified/deleted leading to different generation number.
var notFoundError *gcs.NotFoundError
if errors.As(err, ¬FoundError) {
err = &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("NewReader: %w", err),
}
}
if err != nil {
err = fmt.Errorf("NewReader: %w", err)
}
return rc, err
}
// Ensure that content exists and is not stale
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) ensureContent(ctx context.Context) (err error) {
if f.localFileCache {
// Fetch content from the cache after validating generation numbers again
// Generation validation first occurs at inode creation/destruction
cacheObjectKey := &contentcache.CacheObjectKey{BucketName: f.bucket.Name(), ObjectName: f.name.objectName}
if cacheObject, exists := f.contentCache.Get(cacheObjectKey); exists {
if cacheObject.ValidateGeneration(f.src.Generation, f.src.MetaGeneration) {
f.content = cacheObject.CacheFile
return
}
}
rc, err := f.openReader(ctx)
if err != nil {
err = fmt.Errorf("openReader Error: %w", err)
return err
}
// Insert object into content cache
tf, err := f.contentCache.AddOrReplace(cacheObjectKey, f.src.Generation, f.src.MetaGeneration, rc)
if err != nil {
err = fmt.Errorf("AddOrReplace cache error: %w", err)
return err
}
// Update state.
f.content = tf.CacheFile
} else {
// Local filecache is not enabled
if f.content != nil {
return
}
rc, err := f.openReader(ctx)
if err != nil {
err = fmt.Errorf("openReader Error: %w", err)
return err
}
if f.config.Write.EnableStreamingWrites {
logger.Infof("Falling back to staged write for '%s'. Streaming write is limited to sequential writes on new/empty files.", f.name)
}
tf, err := f.contentCache.NewTempFile(rc)
if err != nil {
err = fmt.Errorf("NewTempFile: %w", err)
return err
}
// Update state.
f.content = tf
}
return
}
////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
func (f *FileInode) Lock() {
f.mu.Lock()
}
func (f *FileInode) Unlock() {
f.mu.Unlock()
}
func (f *FileInode) ID() fuseops.InodeID {
return f.id
}
func (f *FileInode) Name() Name {
return f.name
}
func (f *FileInode) IsLocal() bool {
return f.local
}
func (f *FileInode) IsUnlinked() bool {
return f.unlinked
}
func (f *FileInode) Unlink() {
f.unlinked = true
if f.bwh != nil {
f.bwh.Unlink()
}
}
// Source returns a record for the GCS object from which this inode is branched. The
// record is guaranteed not to be modified, and users must not modify it.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Source() *gcs.MinObject {
// Make a copy, since we modify f.src.
o := f.src
return &o
}
// If true, it is safe to serve reads directly from the object given by
// f.Source(), rather than calling f.ReadAt. Doing so may be more efficient,
// because f.ReadAt may cause the entire object to be faulted in and requires
// the inode to be locked during the read. SourceGenerationAuthoritative requires
// SyncPendingBufferedWrites method has been called on f within same inode lock for
// streaming writes with zonal bucket.
// TODO(b/406160290): Check if this can be improved.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) SourceGenerationIsAuthoritative() bool {
// Source generation is authoritative if:
// 1. No pending writes exists on the inode (both content and bwh are nil).
return f.content == nil && f.bwh == nil
}
// Equivalent to the generation returned by f.Source().
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) SourceGeneration() (g Generation) {
g.Size = f.src.Size
g.Object = f.src.Generation
g.Metadata = f.src.MetaGeneration
// If bwh is not nil, it's size takes precedence as that is being actively
// written to GCS.
// Since temporary file does not write to GCS on the go, it's size is not
// used as source object's size.
if f.bwh != nil {
writeFileInfo := f.bwh.WriteFileInfo()
g.Size = uint64(writeFileInfo.TotalSize)
}
return
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) IncrementLookupCount() {
f.lc.Inc()
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) DecrementLookupCount(n uint64) (destroy bool) {
destroy = f.lc.Dec(n)
return
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) RegisterFileHandle(readOnly bool) {
if !readOnly {
f.writeHandleCount++
}
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) DeRegisterFileHandle(readOnly bool) {
if readOnly {
return
}
if f.writeHandleCount <= 0 {
logger.Errorf("Mismatch in number of write file handles for inode :%d", f.id)
}
f.writeHandleCount--
// All write fileHandles associated with bwh are closed. So safe to set bwh to nil.
if f.writeHandleCount == 0 && f.bwh != nil {
err := f.bwh.Destroy()
if err != nil {
logger.Warnf("Error while destroying the bufferedWritesHandler: %v", err)
}
f.bwh = nil
}
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Destroy() (err error) {
f.destroyed = true
if f.localFileCache {
cacheObjectKey := &contentcache.CacheObjectKey{BucketName: f.bucket.Name(), ObjectName: f.name.objectName}
f.contentCache.Remove(cacheObjectKey)
} else if f.content != nil {
f.content.Destroy()
}
return
}
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Attributes(
ctx context.Context) (attrs fuseops.InodeAttributes, err error) {
attrs = f.attrs
// Obtain default information from the source object.
attrs.Mtime = f.src.Updated
attrs.Size = f.src.Size
// If the source object has an mtime metadata key, use that instead of its
// update time.
// If the file was copied via gsutil, we'll have goog-reserved-file-mtime
if strTimestamp, ok := f.src.Metadata["goog-reserved-file-mtime"]; ok {
if timestamp, err := strconv.ParseInt(strTimestamp, 0, 64); err == nil {
attrs.Mtime = time.Unix(timestamp, 0)
}
}
// Otherwise, if its been synced with gcsfuse before, we'll have gcsfuse_mtime
if formatted, ok := f.src.Metadata["gcsfuse_mtime"]; ok {
attrs.Mtime, err = time.Parse(time.RFC3339Nano, formatted)
if err != nil {
err = fmt.Errorf("time.Parse(%q): %w", formatted, err)
return
}
}
// If we've got local content, its size and (maybe) mtime take precedence.
if f.content != nil {
var sr gcsx.StatResult
sr, err = f.content.Stat()
if err != nil {
err = fmt.Errorf("stat: %w", err)
return
}
attrs.Size = uint64(sr.Size)
if sr.Mtime != nil {
attrs.Mtime = *sr.Mtime
}
}
if f.bwh != nil {
writeFileInfo := f.bwh.WriteFileInfo()
attrs.Mtime = writeFileInfo.Mtime
attrs.Size = uint64(writeFileInfo.TotalSize)
}
// We require only that atime and ctime be "reasonable".
attrs.Atime = attrs.Mtime
attrs.Ctime = attrs.Mtime
// If the object has been clobbered, we reflect that as the inode being
// unlinked.
_, clobbered, err := f.clobbered(ctx, false, false)
if err != nil {
err = fmt.Errorf("clobbered: %w", err)
return
}
attrs.Nlink = 1
// For local files, also checking if file is unlinked locally.
if clobbered || (f.IsLocal() && f.IsUnlinked()) {
attrs.Nlink = 0
}
return
}
func (f *FileInode) Bucket() *gcsx.SyncerBucket {
return f.bucket
}
// Serve a read for this file with semantics matching io.ReaderAt.
//
// The caller may be better off reading directly from GCS when
// f.SourceGenerationIsAuthoritative() is true.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Read(
ctx context.Context,
dst []byte,
offset int64) (n int, err error) {
// It is not nil when streaming writes are enabled in 2 scenarios:
// 1. Local file
// 2. Empty GCS files and writes are triggered via buffered flow.
if f.bwh != nil {
err = fmt.Errorf("cannot read a file when upload in progress: %w", syscall.ENOTSUP)
return
}
// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
err = fmt.Errorf("ensureContent: %w", err)
return
}
// Read from the local content, propagating io.EOF.
n, err = f.content.ReadAt(dst, offset)
switch {
case err == io.EOF:
return
case err != nil:
err = fmt.Errorf("content.ReadAt: %w", err)
return
}
return
}
// Serve a write for this file with semantics matching fuseops.WriteFileOp.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Write(
ctx context.Context,
data []byte,
offset int64) error {
if f.bwh != nil {
return f.writeUsingBufferedWrites(ctx, data, offset)
}
return f.writeUsingTempFile(ctx, data, offset)
}
// Helper function to serve write for file using temp file.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset int64) (err error) {
// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
err = fmt.Errorf("ensureContent: %w", err)
return
}
// Write to the mutable content. Note that io.WriterAt guarantees it returns
// an error for short writes.
_, err = f.content.WriteAt(data, offset)
return
}
// Helper function to serve write for file using buffered writes handler.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error {
err := f.bwh.Write(data, offset)
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("f.bwh.Write(): %w", err),
}
}
if err != nil && !errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) {
return fmt.Errorf("write to buffered write handler failed: %w", err)
}
// Fall back to temp file for Out-Of-Order Writes.
if errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) {
logger.Infof("Out-of-order write detected. Falling back to temporary file on disk.")
// Finalize the object.
err = f.flushUsingBufferedWriteHandler()
if err != nil {
return fmt.Errorf("could not finalize what has been written so far: %w", err)
}
return f.writeUsingTempFile(ctx, data, offset)
}
return err
}
// Helper function to flush buffered writes handler and update inode state with
// new object.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) flushUsingBufferedWriteHandler() error {
obj, err := f.bwh.Flush()
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("f.bwh.Flush(): %w", err),
}
}
if err != nil {
return fmt.Errorf("f.bwh.Flush(): %w", err)
}
// If we finalized the object, we need to update our state.
f.updateInodeStateAfterFlush(obj)
return nil
}
// SyncPendingBufferedWrites flushes any pending writes on the bwh to GCS.
// It is a no-op when bwh is nil.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) SyncPendingBufferedWrites() (gcsSynced bool, err error) {
if f.bwh == nil {
return
}
minObj, err := f.bwh.Sync()
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
err = &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("f.bwh.Sync(): %w", err),
}
return
}
if err != nil {
err = fmt.Errorf("f.bwh.Sync(): %w", err)
return
}
// We return gcsSynced as true when we get minObject from Sync() for Zonal Buckets.
// For Non-Zonal Buckets minObj is always nil.
gcsSynced = minObj != nil
// If we flushed out object, we need to update our state.
f.updateInodeStateAfterSync(minObj)
return
}
// Set the mtime for this file. May involve a round trip to GCS.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) SetMtime(
ctx context.Context,
mtime time.Time) (err error) {
if f.IsUnlinked() {
// No need to update mtime on GCS for unlinked file.
return
}
// When bufferedWritesHandler instance is not nil, set time on bwh.
// It will not be nil in 2 cases when bufferedWrites are enabled:
// 1. local files
// 2. After first write on empty GCS files.
if f.bwh != nil {
f.bwh.SetMtime(mtime)
return
}
// If we have a local temp file, stat it.
var sr gcsx.StatResult
if f.content != nil {
sr, err = f.content.Stat()
if err != nil {
err = fmt.Errorf("stat: %w", err)
return
}
}
// 1. If the local content is dirty, simply update its mtime and return. This
// will cause the object in the bucket to be updated once we sync. If we lose
// power or something the mtime update will be lost, but so will the file
// data modifications so this doesn't seem so bad. It's worth saving the
// round trip to GCS for the common case of Linux writeback caching, where we
// always receive a setattr request just before a flush of a dirty file.
//
// 2. If the file is local, that means its not yet synced to GCS. Just update
// the mtime locally, it will be synced when the object is created on GCS.
if sr.Mtime != nil || f.IsLocal() {
f.content.SetMtime(mtime)
return
}
// Otherwise, update the backing object's metadata.
formatted := mtime.UTC().Format(time.RFC3339Nano)
srcGen := f.SourceGeneration()
req := &gcs.UpdateObjectRequest{
Name: f.src.Name,
Generation: srcGen.Object,
MetaGenerationPrecondition: &srcGen.Metadata,
Metadata: map[string]*string{
FileMtimeMetadataKey: &formatted,
},
}
o, err := f.bucket.UpdateObject(ctx, req)
if err == nil {
var minObj gcs.MinObject
minObjPtr := storageutil.ConvertObjToMinObject(o)
if minObjPtr != nil {
minObj = *minObjPtr
}
f.src = minObj
f.updateMRDWrapper()
return
}
var notFoundErr *gcs.NotFoundError
if errors.As(err, ¬FoundErr) {
// Special case: silently ignore not found errors, which mean the file has
// been unlinked.
err = nil
return
}
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
// Special case: silently ignore precondition errors, which we also take to
// mean the file has been unlinked.
err = nil
return
}
err = fmt.Errorf("UpdateObject: %w", err)
return
}
func (f *FileInode) fetchLatestGcsObject(ctx context.Context) (*gcs.Object, error) {
// When listObjects call is made, we fetch data with projection set as noAcl
// which means acls and owner properties are not returned. So the f.src object
// here will not have acl information even though there are acls present on
// the gcsObject.
// Hence, we are making an explicit gcs stat call to fetch the latest
// properties and using that when object is synced below. StatObject by
// default sets the projection to full, which fetches all the object
// properties.
latestGcsObj, isClobbered, err := f.clobbered(ctx, true, true)
if err != nil {
return nil, err
}
if isClobbered {
return nil, &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("file was clobbered"),
}
}
return latestGcsObj, nil
}
// Sync writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// For buffered writes, this method only waits for any partial buffers to be
// uploaded to GCS. It does not guarantee that the entire contents of the file
// have been persisted.
//
// For non-buffered writes, this method writes the entire contents to GCS.
// If this method succeeds, SourceGeneration will return the new generation by
// which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Sync(ctx context.Context) (gcsSynced bool, err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil && f.bwh == nil {
return
}
if f.bwh != nil {
gcsSynced, err = f.SyncPendingBufferedWrites()
if err != nil {
err = fmt.Errorf("could not sync what has been written so far: %w", err)
}
return
}
err = f.syncUsingContent(ctx)
if err != nil {
return false, err
}
return true, nil
}
// syncUsingContent syncs the inode content to GCS. It fetches the latest GCS
// object, syncs the content and updates the inode state.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) syncUsingContent(ctx context.Context) error {
var latestGcsObj *gcs.Object
if !f.local {
var err error
latestGcsObj, err = f.fetchLatestGcsObject(ctx)
if err != nil {
return err
}
}
// Write out the contents if they are dirty.
// Object properties are also synced as part of content sync. Hence, passing
// the latest object fetched from gcs which has all the properties populated.
newObj, err := f.bucket.SyncObject(ctx, f.Name().GcsObjectName(), latestGcsObj, f.content)
var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("SyncObject: %w", err),
}
}
// Propagate other errors.
if err != nil {
return fmt.Errorf("SyncObject: %w", err)
}
minObj := storageutil.ConvertObjToMinObject(newObj)
// If we wrote out a new object, we need to update our state.
f.updateInodeStateAfterFlush(minObj)
return nil
}
// Flush writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// After this method succeeds, SourceGeneration will return the new generation
// by which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Flush(ctx context.Context) (err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil && f.bwh == nil {
return
}
// Flush using the appropriate method based on whether we're using a
// buffered write handler.
if f.bwh != nil {
return f.flushUsingBufferedWriteHandler()
}
return f.syncUsingContent(ctx)
}
func (f *FileInode) updateInodeStateAfterFlush(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
// Set BWH to nil as as object has been finalized.
if f.bwh != nil {
f.bwh = nil
}
f.updateInodeStateAfterSync(minObj)
}
}
func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Update MRDWrapper
f.updateMRDWrapper()
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
}
if f.content != nil {
f.content.Destroy()
f.content = nil
}
}
}
// Updates the min object stored in MRDWrapper corresponding to the inode.
// Should be called when minObject associated with inode is updated.
func (f *FileInode) updateMRDWrapper() {
err := f.MRDWrapper.SetMinObject(f.Source())
if err != nil {
logger.Errorf("FileInode::updateMRDWrapper Error in setting minObject %v", err)
}
}
// Truncate the file to the specified size.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Truncate(
ctx context.Context,
size int64) (err error) {
if f.bwh != nil {
return f.bwh.Truncate(size)
}
// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
err = fmt.Errorf("ensureContent: %w", err)
return
}
// Call through.
err = f.content.Truncate(size)
return
}
// Ensures cache content on read if content cache enabled
func (f *FileInode) CacheEnsureContent(ctx context.Context) (err error) {
if f.localFileCache {
err = f.ensureContent(ctx)
}
return
}
// CreateEmptyTempFile creates an empty file with no contents when
// streaming writes are not in enabled.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) CreateEmptyTempFile(ctx context.Context) (err error) {
// Skip creating empty temp file when streaming writes are enabled
// or temp file is already created.
if f.bwh != nil || f.content != nil {
return
}
// Creating a file with no contents. The contents will be updated with
// writeFile operations.
f.content, err = f.contentCache.NewTempFile(io.NopCloser(strings.NewReader("")))
// Setting the initial mtime to creation time.
f.content.SetMtime(f.mtimeClock.Now())
return
}
// Initializes Buffered Write Handler if the file inode is eligible and returns
// initialized as true when the new instance of buffered writer handler is created.
func (f *FileInode) InitBufferedWriteHandlerIfEligible(ctx context.Context) (bool, error) {
// bwh already initialized, do nothing.
if f.bwh != nil {
return false, nil
}
tempFileInUse := f.content != nil
if f.src.Size != 0 || !f.config.Write.EnableStreamingWrites || tempFileInUse {
// bwh should not be initialized under these conditions.
return false, nil
}
var latestGcsObj *gcs.Object
var err error
if !f.local {
latestGcsObj, err = f.fetchLatestGcsObject(ctx)
if err != nil {
return false, err
}
}
if f.bwh == nil {
f.bwh, err = bufferedwrites.NewBWHandler(&bufferedwrites.CreateBWHandlerRequest{
Object: latestGcsObj,
ObjectName: f.name.GcsObjectName(),
Bucket: f.bucket,
BlockSize: f.config.Write.BlockSizeMb,
MaxBlocksPerFile: f.config.Write.MaxBlocksPerFile,
GlobalMaxBlocksSem: f.globalMaxWriteBlocksSem,
ChunkTransferTimeoutSecs: f.config.GcsRetries.ChunkTransferTimeoutSecs,
})
if err != nil {
return false, fmt.Errorf("failed to create bufferedWriteHandler: %w", err)
}
f.bwh.SetMtime(f.mtimeClock.Now())
return true, nil
}
return false, nil
}