internal/storage/caching/fast_stat_bucket.go (325 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package caching
import (
"fmt"
"strings"
"sync"
"time"
"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/metadata"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"golang.org/x/net/context"
"github.com/jacobsa/timeutil"
)
// Create a bucket that caches object records returned by the supplied wrapped
// bucket. Records are invalidated when modifications are made through this
// bucket, and after the supplied TTL.
func NewFastStatBucket(
primaryCacheTTL time.Duration,
cache metadata.StatCache,
clock timeutil.Clock,
wrapped gcs.Bucket,
negativeCacheTTL time.Duration,
) (b gcs.Bucket) {
fsb := &fastStatBucket{
cache: cache,
clock: clock,
wrapped: wrapped,
primaryCacheTTL: primaryCacheTTL,
negativeCacheTTL: negativeCacheTTL,
}
b = fsb
return
}
type fastStatBucket struct {
mu sync.Mutex
/////////////////////////
// Dependencies
/////////////////////////
// GUARDED_BY(mu)
cache metadata.StatCache
clock timeutil.Clock
wrapped gcs.Bucket
/////////////////////////
// Constant data
/////////////////////////
// TTL for entries for existing files and folders in the cache.
primaryCacheTTL time.Duration
// TTL for entries for non-existing files and folders in the cache.
negativeCacheTTL time.Duration
}
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertMultiple(objs []*gcs.Object) {
b.mu.Lock()
defer b.mu.Unlock()
expiration := b.clock.Now().Add(b.primaryCacheTTL)
for _, o := range objs {
m := storageutil.ConvertObjToMinObject(o)
b.cache.Insert(m, expiration)
}
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertMultipleMinObjects(minObjs []*gcs.MinObject) {
b.mu.Lock()
defer b.mu.Unlock()
expiration := b.clock.Now().Add(b.primaryCacheTTL)
for _, o := range minObjs {
b.cache.Insert(o, expiration)
}
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) eraseEntriesWithGivenPrefix(folderName string) {
b.mu.Lock()
defer b.mu.Unlock()
b.cache.EraseEntriesWithGivenPrefix(folderName)
}
// insertHierarchicalListing saves the objects in cache excluding zero byte objects corresponding to folders
// by iterating objects present in listing and saves prefixes as folders (all prefixes are folders in hns) by
// iterating collapsedRuns of listing.
func (b *fastStatBucket) insertHierarchicalListing(listing *gcs.Listing) {
b.mu.Lock()
defer b.mu.Unlock()
expiration := b.clock.Now().Add(b.primaryCacheTTL)
for _, o := range listing.MinObjects {
if !strings.HasSuffix(o.Name, "/") {
b.cache.Insert(o, expiration)
}
}
for _, p := range listing.CollapsedRuns {
if !strings.HasSuffix(p, "/") {
// log the error for incorrect prefix but don't fail the operation
logger.Errorf("error in prefix name: %s", p)
} else {
f := &gcs.Folder{
Name: p,
}
b.cache.InsertFolder(f, expiration)
}
}
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insert(o *gcs.Object) {
b.insertMultiple([]*gcs.Object{o})
}
func (b *fastStatBucket) insertMinObject(o *gcs.MinObject) {
b.insertMultipleMinObjects([]*gcs.MinObject{o})
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertFolder(f *gcs.Folder) {
b.mu.Lock()
defer b.mu.Unlock()
b.cache.InsertFolder(f, b.clock.Now().Add(b.primaryCacheTTL))
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) addNegativeEntry(name string) {
b.mu.Lock()
defer b.mu.Unlock()
expiration := b.clock.Now().Add(b.negativeCacheTTL)
b.cache.AddNegativeEntry(name, expiration)
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) addNegativeEntryForFolder(name string) {
b.mu.Lock()
defer b.mu.Unlock()
expiration := b.clock.Now().Add(b.negativeCacheTTL)
b.cache.AddNegativeEntryForFolder(name, expiration)
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) invalidate(name string) {
b.mu.Lock()
defer b.mu.Unlock()
b.cache.Erase(name)
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) lookUp(name string) (hit bool, m *gcs.MinObject) {
b.mu.Lock()
defer b.mu.Unlock()
hit, m = b.cache.LookUp(name, b.clock.Now())
return
}
func (b *fastStatBucket) lookUpFolder(name string) (bool, *gcs.Folder) {
b.mu.Lock()
defer b.mu.Unlock()
hit, f := b.cache.LookUpFolder(name, b.clock.Now())
return hit, f
}
////////////////////////////////////////////////////////////////////////
// Bucket interface
////////////////////////////////////////////////////////////////////////
func (b *fastStatBucket) Name() string {
return b.wrapped.Name()
}
func (b *fastStatBucket) BucketType() gcs.BucketType {
return b.wrapped.BucketType()
}
func (b *fastStatBucket) NewReaderWithReadHandle(
ctx context.Context,
req *gcs.ReadObjectRequest) (rd gcs.StorageReader, err error) {
rd, err = b.wrapped.NewReaderWithReadHandle(ctx, req)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) CreateObject(
ctx context.Context,
req *gcs.CreateObjectRequest) (o *gcs.Object, err error) {
// Throw away any existing record for this object.
b.invalidate(req.Name)
// TODO: create object to be replaced with create folder api once integrated
o, err = b.wrapped.CreateObject(ctx, req)
if err != nil {
return
}
// Record the new object.
b.insert(o)
return
}
func (b *fastStatBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, chunkSize int, callBack func(bytesUploadedSoFar int64)) (gcs.Writer, error) {
return b.wrapped.CreateObjectChunkWriter(ctx, req, chunkSize, callBack)
}
func (b *fastStatBucket) FinalizeUpload(ctx context.Context, writer gcs.Writer) (*gcs.MinObject, error) {
name := writer.ObjectName()
// Throw away any existing record for this object.
b.invalidate(name)
o, err := b.wrapped.FinalizeUpload(ctx, writer)
// Record the new object if err is nil.
if err == nil {
b.insertMinObject(o)
}
return o, err
}
func (b *fastStatBucket) FlushPendingWrites(ctx context.Context, writer gcs.Writer) (*gcs.MinObject, error) {
name := writer.ObjectName()
// Throw away any existing record for this object.
b.invalidate(name)
o, err := b.wrapped.FlushPendingWrites(ctx, writer)
// Record the new object if err is nil.
if err == nil {
b.insertMinObject(o)
}
return o, err
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) CopyObject(
ctx context.Context,
req *gcs.CopyObjectRequest) (o *gcs.Object, err error) {
// Throw away any existing record for the destination name.
b.invalidate(req.DstName)
// Copy the object.
o, err = b.wrapped.CopyObject(ctx, req)
if err != nil {
return
}
// Record the new version.
b.insert(o)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) ComposeObjects(
ctx context.Context,
req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) {
// Throw away any existing record for the destination name.
b.invalidate(req.DstName)
// Copy the object.
o, err = b.wrapped.ComposeObjects(ctx, req)
if err != nil {
return
}
// Record the new version.
b.insert(o)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) StatObject(
ctx context.Context,
req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) {
// If ExtendedObjectAttributes are requested without fetching from gcs enabled, panic.
if !req.ForceFetchFromGcs && req.ReturnExtendedObjectAttributes {
panic("invalid StatObjectRequest: ForceFetchFromGcs: false and ReturnExtendedObjectAttributes: true")
}
// If fetching from gcs is enabled, directly make a call to GCS.
if req.ForceFetchFromGcs {
m, e, err = b.StatObjectFromGcs(ctx, req)
if !req.ReturnExtendedObjectAttributes {
e = nil
}
return
}
// Do we have an entry in the cache?
if hit, entry := b.lookUp(req.Name); hit {
// Negative entries result in NotFoundError.
if entry == nil {
err = &gcs.NotFoundError{
Err: fmt.Errorf("negative cache entry for %v", req.Name),
}
return
}
// Otherwise, return MinObject and nil ExtendedObjectAttributes.
m = entry
return
}
return b.StatObjectFromGcs(ctx, req)
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) ListObjects(
ctx context.Context,
req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) {
// Fetch the listing.
listing, err = b.wrapped.ListObjects(ctx, req)
if err != nil {
return
}
if b.BucketType().Hierarchical {
b.insertHierarchicalListing(listing)
return
}
// note anything we found.
b.insertMultipleMinObjects(listing.MinObjects)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) UpdateObject(
ctx context.Context,
req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) {
// Throw away any existing record for this object.
b.invalidate(req.Name)
// Update the object.
o, err = b.wrapped.UpdateObject(ctx, req)
if err != nil {
return
}
// Record the new version.
b.insert(o)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) DeleteObject(
ctx context.Context,
req *gcs.DeleteObjectRequest) (err error) {
err = b.wrapped.DeleteObject(ctx, req)
if err != nil {
b.invalidate(req.Name)
} else {
b.addNegativeEntry(req.Name)
}
return
}
func (b *fastStatBucket) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (*gcs.Object, error) {
// Throw away any existing record for the source and destination name.
b.invalidate(req.SrcName)
b.invalidate(req.DstName)
// Move the object.
o, err := b.wrapped.MoveObject(ctx, req)
if err != nil {
return nil, err
}
// Record the new version.
b.insert(o)
return o, nil
}
func (b *fastStatBucket) DeleteFolder(ctx context.Context, folderName string) error {
err := b.wrapped.DeleteFolder(ctx, folderName)
// In case of an error; invalidate the cached entry. This will make sure that
// gcsfuse is not caching possibly erroneous status of the folder and next
// call will hit GCS backend to probe the latest status.
if err != nil {
b.invalidate(folderName)
} else {
b.addNegativeEntryForFolder(folderName)
}
return err
}
func (b *fastStatBucket) StatObjectFromGcs(ctx context.Context,
req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) {
m, e, err = b.wrapped.StatObject(ctx, req)
if err != nil {
// Special case: NotFoundError -> negative entry.
if _, ok := err.(*gcs.NotFoundError); ok {
b.addNegativeEntry(req.Name)
}
return
}
// Put the object in cache.
o := storageutil.ConvertMinObjectToObject(m)
b.insert(o)
return
}
func (b *fastStatBucket) GetFolder(ctx context.Context, prefix string) (*gcs.Folder, error) {
if hit, entry := b.lookUpFolder(prefix); hit {
// Negative entries result in NotFoundError.
if entry == nil {
err := &gcs.NotFoundError{
Err: fmt.Errorf("negative cache entry for folder %v", prefix),
}
return nil, err
}
return entry, nil
}
// Fetch the Folder from GCS
return b.getFolderFromGCS(ctx, prefix)
}
func (b *fastStatBucket) getFolderFromGCS(ctx context.Context, prefix string) (*gcs.Folder, error) {
f, err := b.wrapped.GetFolder(ctx, prefix)
if err == nil {
b.insertFolder(f)
return f, nil
}
// Special case: NotFoundError -> negative entry.
if _, ok := err.(*gcs.NotFoundError); ok {
b.addNegativeEntryForFolder(prefix)
}
return nil, err
}
func (b *fastStatBucket) CreateFolder(ctx context.Context, folderName string) (f *gcs.Folder, err error) {
// Throw away any existing record for this folder.
b.invalidate(folderName)
f, err = b.wrapped.CreateFolder(ctx, folderName)
if err != nil {
return
}
// Record the new folder.
b.insertFolder(f)
return
}
func (b *fastStatBucket) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (*gcs.Folder, error) {
f, err := b.wrapped.RenameFolder(ctx, folderName, destinationFolderId)
if err != nil {
return nil, err
}
// Invalidate cache for old directory.
b.eraseEntriesWithGivenPrefix(folderName)
// Insert destination folder.
b.insertFolder(f)
return f, err
}
func (b *fastStatBucket) NewMultiRangeDownloader(
ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (mrd gcs.MultiRangeDownloader, err error) {
mrd, err = b.wrapped.NewMultiRangeDownloader(ctx, req)
return
}