internal/monitor/bucket.go (197 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package monitor import ( "context" "fmt" "time" storagev2 "cloud.google.com/go/storage" "github.com/googlecloudplatform/gcsfuse/v2/common" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" ) // recordRequest records a request and its latency. func recordRequest(ctx context.Context, metricHandle common.MetricHandle, method string, start time.Time) { metricHandle.GCSRequestCount(ctx, 1, []common.MetricAttr{{Key: common.GCSMethod, Value: method}}) latencyUs := time.Since(start).Microseconds() latencyMs := float64(latencyUs) / 1000.0 metricHandle.GCSRequestLatency(ctx, latencyMs, []common.MetricAttr{{Key: common.GCSMethod, Value: method}}) } func CaptureMultiRangeDownloaderMetrics(ctx context.Context, metricHandle common.MetricHandle, method string, start time.Time) { recordRequest(ctx, metricHandle, method, start) } // NewMonitoringBucket returns a gcs.Bucket that exports metrics for monitoring func NewMonitoringBucket(b gcs.Bucket, m common.MetricHandle) gcs.Bucket { return &monitoringBucket{ wrapped: b, metricHandle: m, } } type monitoringBucket struct { wrapped gcs.Bucket metricHandle common.MetricHandle } func (mb *monitoringBucket) Name() string { return mb.wrapped.Name() } func (mb *monitoringBucket) BucketType() gcs.BucketType { return mb.wrapped.BucketType() } func setupReader(ctx context.Context, mb *monitoringBucket, req *gcs.ReadObjectRequest, method string) (gcs.StorageReader, error) { startTime := time.Now() rc, err := mb.wrapped.NewReaderWithReadHandle(ctx, req) if err == nil { rc = newGCSFullReadCloser(rc) rc = newMonitoringReadCloser(ctx, req.Name, rc, mb.metricHandle) } recordRequest(ctx, mb.metricHandle, method, startTime) return rc, err } func (mb *monitoringBucket) NewReaderWithReadHandle( ctx context.Context, req *gcs.ReadObjectRequest) (rd gcs.StorageReader, err error) { // Using NewReader here also as NewReader() method is not used and will be removed. rd, err = setupReader(ctx, mb, req, "NewReader") return } func (mb *monitoringBucket) CreateObject( ctx context.Context, req *gcs.CreateObjectRequest) (*gcs.Object, error) { startTime := time.Now() o, err := mb.wrapped.CreateObject(ctx, req) recordRequest(ctx, mb.metricHandle, "CreateObject", startTime) return o, err } func (mb *monitoringBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, chunkSize int, callBack func(bytesUploadedSoFar int64)) (gcs.Writer, error) { startTime := time.Now() wc, err := mb.wrapped.CreateObjectChunkWriter(ctx, req, chunkSize, callBack) recordRequest(ctx, mb.metricHandle, "CreateObjectChunkWriter", startTime) return wc, err } func (mb *monitoringBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) { startTime := time.Now() o, err := mb.wrapped.FinalizeUpload(ctx, w) recordRequest(ctx, mb.metricHandle, "FinalizeUpload", startTime) return o, err } func (mb *monitoringBucket) FlushPendingWrites(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) { startTime := time.Now() o, err := mb.wrapped.FlushPendingWrites(ctx, w) recordRequest(ctx, mb.metricHandle, "FlushPendingWrites", startTime) return o, err } func (mb *monitoringBucket) CopyObject( ctx context.Context, req *gcs.CopyObjectRequest) (*gcs.Object, error) { startTime := time.Now() o, err := mb.wrapped.CopyObject(ctx, req) recordRequest(ctx, mb.metricHandle, "CopyObject", startTime) return o, err } func (mb *monitoringBucket) ComposeObjects( ctx context.Context, req *gcs.ComposeObjectsRequest) (*gcs.Object, error) { startTime := time.Now() o, err := mb.wrapped.ComposeObjects(ctx, req) recordRequest(ctx, mb.metricHandle, "ComposeObjects", startTime) return o, err } func (mb *monitoringBucket) StatObject( ctx context.Context, req *gcs.StatObjectRequest) (*gcs.MinObject, *gcs.ExtendedObjectAttributes, error) { startTime := time.Now() m, e, err := mb.wrapped.StatObject(ctx, req) recordRequest(ctx, mb.metricHandle, "StatObject", startTime) return m, e, err } func (mb *monitoringBucket) ListObjects( ctx context.Context, req *gcs.ListObjectsRequest) (*gcs.Listing, error) { startTime := time.Now() listing, err := mb.wrapped.ListObjects(ctx, req) recordRequest(ctx, mb.metricHandle, "ListObjects", startTime) return listing, err } func (mb *monitoringBucket) UpdateObject( ctx context.Context, req *gcs.UpdateObjectRequest) (*gcs.Object, error) { startTime := time.Now() o, err := mb.wrapped.UpdateObject(ctx, req) recordRequest(ctx, mb.metricHandle, "UpdateObject", startTime) return o, err } func (mb *monitoringBucket) DeleteObject( ctx context.Context, req *gcs.DeleteObjectRequest) error { startTime := time.Now() err := mb.wrapped.DeleteObject(ctx, req) recordRequest(ctx, mb.metricHandle, "DeleteObject", startTime) return err } func (mb *monitoringBucket) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (*gcs.Object, error) { startTime := time.Now() o, err := mb.wrapped.MoveObject(ctx, req) recordRequest(ctx, mb.metricHandle, "MoveObject", startTime) return o, err } func (mb *monitoringBucket) DeleteFolder(ctx context.Context, folderName string) error { startTime := time.Now() err := mb.wrapped.DeleteFolder(ctx, folderName) recordRequest(ctx, mb.metricHandle, "DeleteFolder", startTime) return err } func (mb *monitoringBucket) GetFolder(ctx context.Context, folderName string) (*gcs.Folder, error) { startTime := time.Now() folder, err := mb.wrapped.GetFolder(ctx, folderName) recordRequest(ctx, mb.metricHandle, "GetFolder", startTime) return folder, err } func (mb *monitoringBucket) CreateFolder(ctx context.Context, folderName string) (*gcs.Folder, error) { startTime := time.Now() folder, err := mb.wrapped.CreateFolder(ctx, folderName) recordRequest(ctx, mb.metricHandle, "CreateFolder", startTime) return folder, err } func (mb *monitoringBucket) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (o *gcs.Folder, err error) { startTime := time.Now() o, err = mb.wrapped.RenameFolder(ctx, folderName, destinationFolderId) recordRequest(ctx, mb.metricHandle, "RenameFolder", startTime) return } func (mb *monitoringBucket) NewMultiRangeDownloader( ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (mrd gcs.MultiRangeDownloader, err error) { startTime := time.Now() mrd, err = mb.wrapped.NewMultiRangeDownloader(ctx, req) recordRequest(ctx, mb.metricHandle, "NewMultiRangeDownloader", startTime) return } // recordReader increments the reader count when it's opened or closed. func recordReader(ctx context.Context, metricHandle common.MetricHandle, ioMethod string) { metricHandle.GCSReaderCount(ctx, 1, []common.MetricAttr{{Key: common.IOMethod, Value: ioMethod}}) } // Monitoring on the object reader func newMonitoringReadCloser(ctx context.Context, object string, rc gcs.StorageReader, metricHandle common.MetricHandle) gcs.StorageReader { recordReader(ctx, metricHandle, "opened") return &monitoringReadCloser{ ctx: ctx, object: object, wrapped: rc, metricHandle: metricHandle, } } type monitoringReadCloser struct { ctx context.Context object string wrapped gcs.StorageReader metricHandle common.MetricHandle } func (mrc *monitoringReadCloser) Read(p []byte) (n int, err error) { n, err = mrc.wrapped.Read(p) mrc.metricHandle.GCSReadBytesCount(mrc.ctx, int64(n)) return } func (mrc *monitoringReadCloser) Close() (err error) { err = mrc.wrapped.Close() if err != nil { return fmt.Errorf("close reader: %w", err) } recordReader(mrc.ctx, mrc.metricHandle, "closed") return } func (mrc *monitoringReadCloser) ReadHandle() (rh storagev2.ReadHandle) { rh = mrc.wrapped.ReadHandle() recordReader(mrc.ctx, mrc.metricHandle, "ReadHandle") return }