internal/bufferedwrites/buffered_write_handler.go (208 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bufferedwrites
import (
"errors"
"fmt"
"math"
"time"
"github.com/googlecloudplatform/gcsfuse/v2/internal/block"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"golang.org/x/sync/semaphore"
)
// Note: All the write operations take inode lock in fs.go, hence we don't need any locks here
// as we will get write operations serially.
type BufferedWriteHandler interface {
// Write writes the given data to the buffer. It writes to an existing buffer if
// the capacity is available otherwise writes to a new buffer.
Write(data []byte, offset int64) (err error)
// Sync uploads all the pending buffers to GCS.
// Sync returns
// 1. un-finalized object created on GCS for zonal buckets.
// 2. nil object for non-zonal buckets.
Sync() (*gcs.MinObject, error)
// Flush finalizes the upload.
Flush() (*gcs.MinObject, error)
// SetMtime stores the mtime with the bufferedWriteHandler.
SetMtime(mtime time.Time)
// Truncate allows truncating the file to a larger size.
Truncate(size int64) error
// WriteFileInfo returns the file info i.e, how much data has been buffered so far
// and the mtime.
WriteFileInfo() WriteFileInfo
// Destroy destroys the upload handler and then free up the buffers.
Destroy() error
// Unlink cancels the ongoing upload and free up the buffers.
Unlink()
}
// bufferedWriteHandlerImpl is responsible for filling up the buffers with the data
// as it receives and handing over to uploadHandler which uploads to GCS.
type bufferedWriteHandlerImpl struct {
current block.Block
blockPool *block.BlockPool
uploadHandler *UploadHandler
// Total size of data buffered so far. Some part of buffered data might have
// been uploaded to GCS as well. Depending on the state we are in, it might or
// might not include truncatedSize.
totalSize int64
// Stores the mtime value updated by kernel as part of setInodeAttributes call.
mtime time.Time
// Stores the size to truncate. No action is made when truncate is called.
// Will be used as mentioned below:
// 1. During flush if totalSize != truncatedSize, additional dummy data is
// added before flush and uploaded.
// 2. If write is started after the truncate offset, dummy data is created
// as per the truncatedSize and then new data is appended to it.
truncatedSize int64
}
// WriteFileInfo is used as part of serving fileInode attributes (GetInodeAttributes call).
type WriteFileInfo struct {
TotalSize int64
Mtime time.Time
}
var ErrOutOfOrderWrite = errors.New("outOfOrder write detected")
type CreateBWHandlerRequest struct {
Object *gcs.Object
ObjectName string
Bucket gcs.Bucket
BlockSize int64
MaxBlocksPerFile int64
GlobalMaxBlocksSem *semaphore.Weighted
ChunkTransferTimeoutSecs int64
}
// NewBWHandler creates the bufferedWriteHandler struct.
func NewBWHandler(req *CreateBWHandlerRequest) (bwh BufferedWriteHandler, err error) {
bp, err := block.NewBlockPool(req.BlockSize, req.MaxBlocksPerFile, req.GlobalMaxBlocksSem)
if err != nil {
return
}
bwh = &bufferedWriteHandlerImpl{
current: nil,
blockPool: bp,
uploadHandler: newUploadHandler(&CreateUploadHandlerRequest{
Object: req.Object,
ObjectName: req.ObjectName,
Bucket: req.Bucket,
FreeBlocksCh: bp.FreeBlocksChannel(),
MaxBlocksPerFile: req.MaxBlocksPerFile,
BlockSize: req.BlockSize,
ChunkTransferTimeoutSecs: req.ChunkTransferTimeoutSecs,
}),
totalSize: 0,
mtime: time.Now(),
truncatedSize: -1,
}
return
}
func (wh *bufferedWriteHandlerImpl) Write(data []byte, offset int64) (err error) {
// Fail early if the uploadHandler has already failed.
err = wh.uploadHandler.UploadError()
if err != nil {
return
}
if offset != wh.totalSize && offset != wh.truncatedSize {
logger.Errorf("BufferedWriteHandler.OutOfOrderError for object: %s, expectedOffset: %d, actualOffset: %d",
wh.uploadHandler.objectName, wh.totalSize, offset)
return ErrOutOfOrderWrite
}
if offset == wh.truncatedSize {
// Check and update if any data filling has to be done.
err = wh.writeDataForTruncatedSize()
if err != nil {
return
}
}
return wh.appendBuffer(data)
}
func (wh *bufferedWriteHandlerImpl) appendBuffer(data []byte) (err error) {
dataWritten := 0
for dataWritten < len(data) {
if wh.current == nil {
wh.current, err = wh.blockPool.Get()
if err != nil {
return fmt.Errorf("failed to get new block: %w", err)
}
}
remainingBlockSize := float64(wh.blockPool.BlockSize()) - float64(wh.current.Size())
pendingDataForWrite := float64(len(data)) - float64(dataWritten)
bytesToCopy := int(math.Min(remainingBlockSize, pendingDataForWrite))
err := wh.current.Write(data[dataWritten : dataWritten+bytesToCopy])
if err != nil {
return err
}
dataWritten += bytesToCopy
if wh.current.Size() == wh.blockPool.BlockSize() {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return err
}
wh.current = nil
}
}
wh.totalSize += int64(dataWritten)
return
}
func (wh *bufferedWriteHandlerImpl) Sync() (o *gcs.MinObject, err error) {
// Upload current block (for both regional and zonal buckets).
if wh.current != nil && wh.current.Size() != 0 {
err = wh.uploadHandler.Upload(wh.current)
if err != nil {
return nil, err
}
wh.current = nil
}
// Upload all the pending buffers.
wh.uploadHandler.AwaitBlocksUpload()
// The FlushPendingWrites method synchronizes all bytes currently residing in
// the Writer's buffer to Cloud Storage, thereby making them available for
// other operations like read.
// This functionality is exclusively supported on zonal buckets.
if wh.uploadHandler.bucket.BucketType().Zonal {
o, err = wh.uploadHandler.FlushPendingWrites()
if err != nil {
return nil, err
}
if o.Size != uint64(wh.totalSize) {
return nil, fmt.Errorf("could not upload entire data, expected offset %d, Got %d", wh.totalSize, o.Size)
}
}
// Release memory used by buffers.
err = wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak as upload succeeded.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed during sync: %v", err)
}
err = wh.uploadHandler.UploadError()
if err != nil {
return nil, err
}
return o, nil
}
// Flush finalizes the upload.
func (wh *bufferedWriteHandlerImpl) Flush() (*gcs.MinObject, error) {
// Fail early if upload already failed.
err := wh.uploadHandler.UploadError()
if err != nil {
return nil, err
}
// In case it is a truncated file, upload empty blocks as required.
err = wh.writeDataForTruncatedSize()
if err != nil {
return nil, err
}
if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return nil, err
}
wh.current = nil
}
obj, err := wh.uploadHandler.Finalize()
if err != nil {
return nil, fmt.Errorf("BufferedWriteHandler.Flush(): %w", err)
}
err = wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak as upload succeeded.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed: %v", err)
}
return obj, nil
}
func (wh *bufferedWriteHandlerImpl) SetMtime(mtime time.Time) {
wh.mtime = mtime
}
func (wh *bufferedWriteHandlerImpl) Truncate(size int64) error {
if size < wh.totalSize {
return fmt.Errorf("cannot truncate to lesser size when upload is in progress")
}
wh.truncatedSize = size
return nil
}
func (wh *bufferedWriteHandlerImpl) WriteFileInfo() WriteFileInfo {
return WriteFileInfo{
TotalSize: int64(math.Max(float64(wh.totalSize), float64(wh.truncatedSize))),
Mtime: wh.mtime,
}
}
func (wh *bufferedWriteHandlerImpl) Destroy() error {
wh.uploadHandler.Destroy()
return wh.blockPool.ClearFreeBlockChannel()
}
func (wh *bufferedWriteHandlerImpl) writeDataForTruncatedSize() error {
// If totalSize is greater than truncatedSize, that means user has
// written more data than they actually truncated in the beginning.
if wh.totalSize >= wh.truncatedSize {
return nil
}
// Otherwise append dummy data to match truncatedSize.
diff := wh.truncatedSize - wh.totalSize
// Create 1MB of data at a time to avoid OOM
chunkSize := 1024 * 1024
for i := 0; i < int(diff); i += chunkSize {
size := math.Min(float64(chunkSize), float64(int(diff)-i))
err := wh.appendBuffer(make([]byte, int(size)))
if err != nil {
return err
}
}
return nil
}
func (wh *bufferedWriteHandlerImpl) Unlink() {
wh.uploadHandler.CancelUpload()
err := wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed: %v", err)
}
}