internal/bufferedwrites/upload_handler.go (155 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Note: All the write operations take inode lock in fs.go, hence we don't need
// any locks here as we will get calls to these methods serially.
package bufferedwrites
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"github.com/googlecloudplatform/gcsfuse/v2/internal/block"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
)
// UploadHandler is responsible for synchronized uploads of the filled blocks
// to GCS and then putting them back for reuse once the block has been uploaded.
type UploadHandler struct {
// Channel for receiving blocks to be uploaded to GCS.
uploadCh chan block.Block
// Wait group for waiting for the uploader goroutine to finish.
wg sync.WaitGroup
// Channel on which uploaded block will be posted for reuse.
freeBlocksCh chan block.Block
// writer to resumable upload the blocks to GCS.
writer gcs.Writer
// uploadError stores atomic pointer to the error seen by uploader.
uploadError atomic.Pointer[error]
// CancelFunc persisted to cancel the uploads in case of unlink operation.
cancelFunc context.CancelFunc
startUploader sync.Once
// Parameters required for creating a new GCS chunk writer.
bucket gcs.Bucket
objectName string
obj *gcs.Object
chunkTransferTimeout int64
blockSize int64
}
type CreateUploadHandlerRequest struct {
Object *gcs.Object
ObjectName string
Bucket gcs.Bucket
FreeBlocksCh chan block.Block
MaxBlocksPerFile int64
BlockSize int64
ChunkTransferTimeoutSecs int64
}
// newUploadHandler creates the UploadHandler struct.
func newUploadHandler(req *CreateUploadHandlerRequest) *UploadHandler {
uh := &UploadHandler{
uploadCh: make(chan block.Block, req.MaxBlocksPerFile),
wg: sync.WaitGroup{},
freeBlocksCh: req.FreeBlocksCh,
bucket: req.Bucket,
objectName: req.ObjectName,
obj: req.Object,
blockSize: req.BlockSize,
chunkTransferTimeout: req.ChunkTransferTimeoutSecs,
}
return uh
}
// Upload adds a block to the upload queue.
func (uh *UploadHandler) Upload(block block.Block) error {
uh.wg.Add(1)
err := uh.ensureWriter()
if err != nil {
return fmt.Errorf("uh.ensureWriter() failed: %v", err)
}
// Start the uploader goroutine but only once.
uh.startUploader.Do(func() {
go uh.uploader()
})
uh.uploadCh <- block
return nil
}
// createObjectWriter creates a GCS object writer.
func (uh *UploadHandler) createObjectWriter() (err error) {
// TODO: b/381479965: Dynamically set chunkTransferTimeoutSecs based on chunk size. 0 here means no timeout.
req := gcs.NewCreateObjectRequest(uh.obj, uh.objectName, nil, 0)
// We need a new context here, since the first writeFile() call will be complete
// (and context will be cancelled) by the time complete upload is done.
var ctx context.Context
ctx, uh.cancelFunc = context.WithCancel(context.Background())
uh.writer, err = uh.bucket.CreateObjectChunkWriter(ctx, req, int(uh.blockSize), nil)
return
}
func (uh *UploadHandler) UploadError() (err error) {
if uploadError := uh.uploadError.Load(); uploadError != nil {
err = *uploadError
}
return
}
// uploader is the single-threaded goroutine that uploads blocks.
func (uh *UploadHandler) uploader() {
for currBlock := range uh.uploadCh {
if uh.UploadError() != nil {
uh.freeBlocksCh <- currBlock
uh.wg.Done()
continue
}
_, err := io.Copy(uh.writer, currBlock.Reader())
if errors.Is(err, context.Canceled) {
// Context canceled error indicates that the file was deleted from the
// same mount. In this case, we suppress the error to match local
// filesystem behavior.
err = nil
}
if err != nil {
logger.Errorf("buffered write upload failed for object %s: error in io.Copy: %v", uh.objectName, err)
err = gcs.GetGCSError(err)
uh.uploadError.Store(&err)
}
// Put back the uploaded block on the freeBlocksChannel for re-use.
uh.freeBlocksCh <- currBlock
uh.wg.Done()
}
}
// Finalize finalizes the upload.
func (uh *UploadHandler) Finalize() (*gcs.MinObject, error) {
uh.wg.Wait()
close(uh.uploadCh)
// Writer may not have been created for empty file creation flow or for very
// small writes of size less than 1 block.
err := uh.ensureWriter()
if err != nil {
return nil, fmt.Errorf("uh.ensureWriter() failed: %v", err)
}
obj, err := uh.bucket.FinalizeUpload(context.Background(), uh.writer)
if err != nil {
// FinalizeUpload already returns GCSerror so no need to convert again.
uh.uploadError.Store(&err)
logger.Errorf("FinalizeUpload failed for object %s: %v", uh.objectName, err)
return nil, err
}
return obj, nil
}
func (uh *UploadHandler) ensureWriter() error {
if uh.writer == nil {
err := uh.createObjectWriter()
if err != nil {
return fmt.Errorf("createObjectWriter failed for object %s: %w", uh.objectName, err)
}
}
return nil
}
// FlushPendingWrites uploads any data in the write buffer.
func (uh *UploadHandler) FlushPendingWrites() (*gcs.MinObject, error) {
uh.wg.Wait()
// Writer may not have been created for empty file creation flow or for very
// small writes of size less than 1 block.
err := uh.ensureWriter()
if err != nil {
return nil, fmt.Errorf("uh.ensureWriter() failed: %v", err)
}
o, err := uh.bucket.FlushPendingWrites(context.Background(), uh.writer)
if err != nil {
// FlushUpload already returns GCS error so no need to convert again.
uh.uploadError.Store(&err)
logger.Errorf("FlushUpload failed for object %s: %v", uh.objectName, err)
return nil, err
}
return o, nil
}
func (uh *UploadHandler) CancelUpload() {
if uh.cancelFunc != nil {
// cancel the context to cancel the ongoing GCS upload.
uh.cancelFunc()
}
// Wait for all in progress buffers to be added to the free channel.
uh.wg.Wait()
}
func (uh *UploadHandler) AwaitBlocksUpload() {
uh.wg.Wait()
}
func (uh *UploadHandler) Destroy() {
// Move all pending blocks to freeBlockCh and close the channel if not done.
for {
select {
case currBlock, ok := <-uh.uploadCh:
// Not ok means channel closed. Return.
if !ok {
return
}
uh.freeBlocksCh <- currBlock
// Marking as wg.Done to ensure any waiters are unblocked.
uh.wg.Done()
default:
// This will get executed when there are no blocks pending in uploadCh and its not closed.
close(uh.uploadCh)
return
}
}
}