in internal/fs/inode/file.go [276:328]
func (f *FileInode) ensureContent(ctx context.Context) (err error) {
if f.localFileCache {
// Fetch content from the cache after validating generation numbers again
// Generation validation first occurs at inode creation/destruction
cacheObjectKey := &contentcache.CacheObjectKey{BucketName: f.bucket.Name(), ObjectName: f.name.objectName}
if cacheObject, exists := f.contentCache.Get(cacheObjectKey); exists {
if cacheObject.ValidateGeneration(f.src.Generation, f.src.MetaGeneration) {
f.content = cacheObject.CacheFile
return
}
}
rc, err := f.openReader(ctx)
if err != nil {
err = fmt.Errorf("openReader Error: %w", err)
return err
}
// Insert object into content cache
tf, err := f.contentCache.AddOrReplace(cacheObjectKey, f.src.Generation, f.src.MetaGeneration, rc)
if err != nil {
err = fmt.Errorf("AddOrReplace cache error: %w", err)
return err
}
// Update state.
f.content = tf.CacheFile
} else {
// Local filecache is not enabled
if f.content != nil {
return
}
rc, err := f.openReader(ctx)
if err != nil {
err = fmt.Errorf("openReader Error: %w", err)
return err
}
if f.config.Write.EnableStreamingWrites {
logger.Infof("Falling back to staged write for '%s'. Streaming write is limited to sequential writes on new/empty files.", f.name)
}
tf, err := f.contentCache.NewTempFile(rc)
if err != nil {
err = fmt.Errorf("NewTempFile: %w", err)
return err
}
// Update state.
f.content = tf
}
return
}