internal/storage/debug_bucket.go (270 lines of code) (raw):

// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package storage import ( "fmt" "io" "sync/atomic" "time" storagev2 "cloud.google.com/go/storage" "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "golang.org/x/net/context" ) // Wrap the supplied bucket in a layer that prints debug messages. func NewDebugBucket( wrapped gcs.Bucket) (b gcs.Bucket) { b = &debugBucket{ wrapped: wrapped, } return } type debugBucket struct { wrapped gcs.Bucket nextRequestID uint64 } //////////////////////////////////////////////////////////////////////// // Helpers //////////////////////////////////////////////////////////////////////// func (b *debugBucket) mintRequestID() (id uint64) { id = atomic.AddUint64(&b.nextRequestID, 1) - 1 return } func (b *debugBucket) requestLogf( id uint64, format string, v ...interface{}) { logger.Tracef("gcs: Req %#16x: %s", id, fmt.Sprintf(format, v...)) } func (b *debugBucket) startRequest( format string, v ...interface{}) (id uint64, desc string, start time.Time) { start = time.Now() id = b.mintRequestID() desc = fmt.Sprintf(format, v...) b.requestLogf(id, "<- %s", desc) return } func (b *debugBucket) finishRequest( id uint64, desc string, start time.Time, err *error) { duration := time.Since(start) errDesc := "OK" if *err != nil { errDesc = (*err).Error() } b.requestLogf(id, "-> %s (%v): %s", desc, duration, errDesc) } //////////////////////////////////////////////////////////////////////// // Reader //////////////////////////////////////////////////////////////////////// type debugReader struct { bucket *debugBucket requestID uint64 desc string startTime time.Time wrapped io.ReadCloser } func (dr *debugReader) Read(p []byte) (n int, err error) { n, err = dr.wrapped.Read(p) // Don't log EOF errors, which are par for the course. if err != nil && err != io.EOF { dr.bucket.requestLogf(dr.requestID, "-> Read error: %v", err) } return } func (dr *debugReader) Close() (err error) { defer dr.bucket.finishRequest( dr.requestID, dr.desc, dr.startTime, &err) err = dr.wrapped.Close() return } func (dr *debugReader) ReadHandle() storagev2.ReadHandle { hd := "opaque-handle" return []byte(hd) } //////////////////////////////////////////////////////////////////////// // Bucket interface //////////////////////////////////////////////////////////////////////// func (b *debugBucket) Name() string { return b.wrapped.Name() } func (b *debugBucket) BucketType() gcs.BucketType { return b.wrapped.BucketType() } func setupReader(ctx context.Context, b *debugBucket, req *gcs.ReadObjectRequest, method string) (gcs.StorageReader, error) { id, desc, start := b.startRequest("%q(%q, %v)", method, req.Name, req.Range) // Call through. rc, err := b.wrapped.NewReaderWithReadHandle(ctx, req) if err != nil { b.finishRequest(id, desc, start, &err) return rc, err } // Return a special reader that prings debug info. rc = &debugReader{ bucket: b, requestID: id, desc: desc, startTime: start, wrapped: rc, } return rc, err } func (b *debugBucket) NewReaderWithReadHandle( ctx context.Context, req *gcs.ReadObjectRequest) (rd gcs.StorageReader, err error) { rd, err = setupReader(ctx, b, req, "ReadWithReadHandle") return } func (b *debugBucket) CreateObject( ctx context.Context, req *gcs.CreateObjectRequest) (o *gcs.Object, err error) { id, desc, start := b.startRequest("CreateObject(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.CreateObject(context.WithValue(ctx, gcs.ReqIdField, id), req) return } func (b *debugBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, chunkSize int, callBack func(bytesUploadedSoFar int64)) (wc gcs.Writer, err error) { id, desc, start := b.startRequest("CreateObjectChunkWriter(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) wc, err = b.wrapped.CreateObjectChunkWriter(context.WithValue(ctx, gcs.ReqIdField, id), req, chunkSize, callBack) return } func (b *debugBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) { id, desc, start := b.startRequest("FinalizeUpload(%q)", w.ObjectName()) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.FinalizeUpload(ctx, w) return } func (b *debugBucket) FlushPendingWrites(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) { id, desc, start := b.startRequest("FlushPendingWrites(%q)", w.ObjectName()) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.FlushPendingWrites(ctx, w) return } func (b *debugBucket) CopyObject( ctx context.Context, req *gcs.CopyObjectRequest) (o *gcs.Object, err error) { id, desc, start := b.startRequest( "CopyObject(%q, %q)", req.SrcName, req.DstName) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.CopyObject(ctx, req) return } func (b *debugBucket) ComposeObjects( ctx context.Context, req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) { id, desc, start := b.startRequest( "ComposeObjects(%q)", req.DstName) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.ComposeObjects(ctx, req) return } func (b *debugBucket) StatObject( ctx context.Context, req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) { id, desc, start := b.startRequest("StatObject(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) m, e, err = b.wrapped.StatObject(ctx, req) return } func (b *debugBucket) ListObjects( ctx context.Context, req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) { id, desc, start := b.startRequest("ListObjects(%q)", req.Prefix) defer b.finishRequest(id, desc, start, &err) listing, err = b.wrapped.ListObjects(ctx, req) return } func (b *debugBucket) UpdateObject( ctx context.Context, req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) { id, desc, start := b.startRequest("UpdateObject(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.UpdateObject(ctx, req) return } func (b *debugBucket) DeleteObject( ctx context.Context, req *gcs.DeleteObjectRequest) (err error) { id, desc, start := b.startRequest("DeleteObject(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) err = b.wrapped.DeleteObject(ctx, req) return } func (b *debugBucket) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (*gcs.Object, error) { var err error var o *gcs.Object id, desc, start := b.startRequest("MoveObject(%q, %q)", req.SrcName, req.DstName) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.MoveObject(ctx, req) return o, err } func (b *debugBucket) DeleteFolder(ctx context.Context, folderName string) (err error) { id, desc, start := b.startRequest("DeleteFolder(%q)", folderName) defer b.finishRequest(id, desc, start, &err) err = b.wrapped.DeleteFolder(ctx, folderName) return err } func (b *debugBucket) GetFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) { id, desc, start := b.startRequest("GetFolder(%q)", folderName) defer b.finishRequest(id, desc, start, &err) folder, err = b.wrapped.GetFolder(ctx, folderName) return } func (b *debugBucket) CreateFolder(ctx context.Context, folderName string) (folder *gcs.Folder, err error) { id, desc, start := b.startRequest("CreateFolder(%q)", folderName) defer b.finishRequest(id, desc, start, &err) folder, err = b.wrapped.CreateFolder(ctx, folderName) return } func (b *debugBucket) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (o *gcs.Folder, err error) { id, desc, start := b.startRequest("RenameFolder(%q)", folderName) defer b.finishRequest(id, desc, start, &err) o, err = b.wrapped.RenameFolder(ctx, folderName, destinationFolderId) return o, err } type debugMultiRangeDownloader struct { bucket *debugBucket requestID uint64 desc string startTime time.Time wrapped gcs.MultiRangeDownloader } func (dmrd *debugMultiRangeDownloader) Add(output io.Writer, offset, length int64, callback func(int64, int64, error)) { id, desc, start := dmrd.bucket.startRequest("MultiRangeDownloader.Add(%v,%v)", offset, length) wrapperCallback := func(offset int64, length int64, err error) { defer dmrd.bucket.finishRequest(id, desc, start, &err) if callback != nil { callback(offset, length, err) } } dmrd.wrapped.Add(output, offset, length, wrapperCallback) } func (dmrd *debugMultiRangeDownloader) Close() (err error) { id, desc, start := dmrd.bucket.startRequest("MultiRangeDownloader.Close()") defer dmrd.bucket.finishRequest(id, desc, start, &err) err = dmrd.wrapped.Close() return } func (dmrd *debugMultiRangeDownloader) Wait() { id, desc, start := dmrd.bucket.startRequest("MultiRangeDownloader.Wait()") var err error defer dmrd.bucket.finishRequest(id, desc, start, &err) dmrd.wrapped.Wait() } func (dmrd *debugMultiRangeDownloader) Error() (err error) { err = dmrd.wrapped.Error() return } func (b *debugBucket) NewMultiRangeDownloader( ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (mrd gcs.MultiRangeDownloader, err error) { id, desc, start := b.startRequest("NewMultiRangeDownloader(%q)", req.Name) defer b.finishRequest(id, desc, start, &err) // Call through. mrd, err = b.wrapped.NewMultiRangeDownloader(ctx, req) if err != nil { b.finishRequest(id, desc, start, &err) return } // Return a special reader that prints debug info. mrd = &debugMultiRangeDownloader{ bucket: b, requestID: id, desc: desc, startTime: start, wrapped: mrd, } return }