internal/storage/fake/bucket.go (838 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fake
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"hash/crc32"
"io"
"path/filepath"
"sort"
"strings"
"time"
"unicode/utf8"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/jacobsa/syncutil"
"github.com/jacobsa/timeutil"
)
var crc32cTable = crc32.MakeTable(crc32.Castagnoli)
// Equivalent to NewConn(clock).GetBucket(name).
func NewFakeBucket(clock timeutil.Clock, name string, bucketType gcs.BucketType) gcs.Bucket {
b := &bucket{clock: clock, name: name, bucketType: bucketType}
b.mu = syncutil.NewInvariantMutex(b.checkInvariants)
return b
}
////////////////////////////////////////////////////////////////////////
// Helper types
////////////////////////////////////////////////////////////////////////
type fakeObject struct {
metadata gcs.Object
data []byte
}
// A slice of objects compared by name.
type fakeObjectSlice []fakeObject
// A slice of folders compared by name.
type fakeFolderSlice []gcs.Folder
func (s fakeObjectSlice) Len() int {
return len(s)
}
func (f fakeFolderSlice) Len() int {
return len(f)
}
func (s fakeObjectSlice) Less(i, j int) bool {
return s[i].metadata.Name < s[j].metadata.Name
}
func (f fakeFolderSlice) Less(i, j int) bool {
return f[i].Name < f[j].Name
}
func (s fakeObjectSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (f fakeFolderSlice) Swap(i, j int) {
f[i], f[j] = f[j], f[i]
}
// Return the smallest i such that s[i].metadata.Name >= name, or len(s) if
// there is no such i.
func (s fakeObjectSlice) lowerBound(name string) int {
pred := func(i int) bool {
return s[i].metadata.Name >= name
}
return sort.Search(len(s), pred)
}
// Return the smallest i such that s[i].Name >= name, or len(s) if
// there is no such i.
func (f fakeFolderSlice) lowerBound(name string) int {
pred := func(i int) bool {
return f[i].Name >= name
}
return sort.Search(len(f), pred)
}
// Return the smallest i such that s[i].metadata.Name == name, or len(s) if
// there is no such i.
func (s fakeObjectSlice) find(name string) int {
lb := s.lowerBound(name)
if lb < len(s) && s[lb].metadata.Name == name {
return lb
}
return len(s)
}
// Return the smallest i such that f[i].Name == name, or len(s) if
// there is no such i.
func (f fakeFolderSlice) find(name string) int {
lb := f.lowerBound(name)
if lb < len(f) && f[lb].Name == name {
return lb
}
return len(f)
}
// Return the smallest string that is lexicographically larger than prefix and
// does not have prefix as a prefix. For the sole case where this is not
// possible (all strings consisting solely of 0xff bytes, including the empty
// string), return the empty string.
func prefixSuccessor(prefix string) string {
// Attempt to increment the last byte. If that is a 0xff byte, erase it and
// recurse. If we hit an empty string, then we know our task is impossible.
limit := []byte(prefix)
for len(limit) > 0 {
b := limit[len(limit)-1]
if b != 0xff {
limit[len(limit)-1]++
break
}
limit = limit[:len(limit)-1]
}
return string(limit)
}
// Return the smallest i such that prefix < s[i].metadata.Name and
// !strings.HasPrefix(s[i].metadata.Name, prefix).
func (s fakeObjectSlice) prefixUpperBound(prefix string) int {
successor := prefixSuccessor(prefix)
if successor == "" {
return len(s)
}
return s.lowerBound(successor)
}
////////////////////////////////////////////////////////////////////////
// Helpers
////////////////////////////////////////////////////////////////////////
type bucket struct {
clock timeutil.Clock
name string
bucketType gcs.BucketType
mu syncutil.InvariantMutex
// The set of extant objects.
//
// INVARIANT: Strictly increasing.
objects fakeObjectSlice // GUARDED_BY(mu)
folders fakeFolderSlice
// The most recent generation number that was minted. The next object will
// receive generation prevGeneration + 1.
//
// INVARIANT: This is an upper bound for generation numbers in objects.
prevGeneration int64 // GUARDED_BY(mu)
}
func checkName(name string) (err error) {
if len(name) == 0 || len(name) > 1024 {
err = errors.New("invalid object name: length must be in [1, 1024]")
return
}
if !utf8.ValidString(name) {
err = errors.New("invalid object name: not valid UTF-8")
return
}
for _, r := range name {
if r == 0x0a || r == 0x0d {
err = errors.New("invalid object name: must not contain CR or LF")
return
}
}
return
}
// LOCKS_REQUIRED(b.mu)
func (b *bucket) checkInvariants() {
// Make sure 'objects' is strictly increasing.
for i := 1; i < len(b.objects); i++ {
objA := b.objects[i-1]
objB := b.objects[i]
if !(objA.metadata.Name < objB.metadata.Name) {
panic(
fmt.Sprintf(
"Object names are not strictly increasing: %v vs. %v",
objA.metadata.Name,
objB.metadata.Name))
}
}
// Make sure prevGeneration is an upper bound for object generation numbers.
for _, o := range b.objects {
if !(o.metadata.Generation <= b.prevGeneration) {
panic(
fmt.Sprintf(
"Object generation %v exceeds %v",
o.metadata.Generation,
b.prevGeneration))
}
}
}
// Create an object struct for the given attributes and contents.
//
// LOCKS_REQUIRED(b.mu)
func (b *bucket) mintObject(
req *gcs.CreateObjectRequest,
contents []byte) (o fakeObject) {
md5Sum := md5.Sum(contents)
crc32c := crc32.Checksum(contents, crc32cTable)
// Set up basic info.
b.prevGeneration++
o.metadata = gcs.Object{
Name: req.Name,
ContentType: req.ContentType,
ContentLanguage: req.ContentLanguage,
CacheControl: req.CacheControl,
Owner: "user-fake",
Size: uint64(len(contents)),
ContentEncoding: req.ContentEncoding,
ComponentCount: 1,
MD5: &md5Sum,
CRC32C: &crc32c,
MediaLink: "http://localhost/download/storage/fake/" + req.Name,
Metadata: copyMetadata(req.Metadata),
Generation: b.prevGeneration,
MetaGeneration: 1,
StorageClass: "STANDARD",
Updated: b.clock.Now(),
}
// Set up data.
o.data = contents
return
}
// Create a folder struct for the given name.
//
// LOCKS_REQUIRED(b.mu)
func (b *bucket) mintFolder(folderName string) (f gcs.Folder) {
f = gcs.Folder{
Name: folderName,
UpdateTime: b.clock.Now(),
}
return
}
// In a hierarchical bucket, all directory objects are also retained as folder entries,
// even if we create objects with non-control client API.
// Therefore, whenever we create directory objects in the fake bucket,
// we also need to create a corresponding folder entry for them in HNS.
//
// For example, when creating an object A/B/a.txt where A and B are implicit directories.
// In our existing flow in the fake bucket, we ignore adding entries for A and B.
// In HNS, we have to add these implicit directories as folder entries.
func (b *bucket) addFolderEntry(path string) {
path = filepath.Dir(path) // Get the directory part of the path
parts := strings.Split(path, string(filepath.Separator))
// This is for adding implicit directories as folder entries.
// For example, createObject(A/B/a.txt) where A and B are implicit directories.
// We need to add both "A" and "A/B/" as folder entries.
for i := range parts {
folder := gcs.Folder{Name: strings.Join(parts[:i+1], string(filepath.Separator)) + string(filepath.Separator)}
existingIndex := b.folders.find(folder.Name)
if existingIndex == len(b.folders) {
b.folders = append(b.folders, folder)
}
}
sort.Sort(b.folders)
}
func preconditionChecks(b *bucket, req *gcs.CreateObjectRequest, contents []byte) (err error) {
// Find any existing record for this name.
existingIndex := b.objects.find(req.Name)
var existingRecord *fakeObject
if existingIndex < len(b.objects) {
existingRecord = &b.objects[existingIndex]
}
// Check the provided checksum, if any.
if req.CRC32C != nil {
actual := crc32.Checksum(contents, crc32cTable)
if actual != *req.CRC32C {
err = fmt.Errorf(
"CRC32C mismatch: got 0x%08x, expected 0x%08x",
actual,
*req.CRC32C)
return
}
}
// Check the provided hash, if any.
if req.MD5 != nil {
actual := md5.Sum(contents)
if actual != *req.MD5 {
err = fmt.Errorf(
"MD5 mismatch: got %s, expected %s",
hex.EncodeToString(actual[:]),
hex.EncodeToString(req.MD5[:]))
return
}
}
// Check preconditions.
if req.GenerationPrecondition != nil {
if *req.GenerationPrecondition == 0 && existingRecord != nil {
err = &gcs.PreconditionError{
Err: errors.New("precondition failed: object exists"),
}
return
}
if *req.GenerationPrecondition > 0 {
if existingRecord == nil {
err = &gcs.PreconditionError{
Err: errors.New("precondition failed: object doesn't exist"),
}
return
}
existingGen := existingRecord.metadata.Generation
if existingGen != *req.GenerationPrecondition {
err = &gcs.PreconditionError{
Err: fmt.Errorf(
"precondition failed: object has generation %v",
existingGen),
}
return
}
}
}
if req.MetaGenerationPrecondition != nil {
if existingRecord == nil {
err = &gcs.PreconditionError{
Err: errors.New("precondition failed: object doesn't exist"),
}
return
}
existingMetaGen := existingRecord.metadata.MetaGeneration
if existingMetaGen != *req.MetaGenerationPrecondition {
err = &gcs.PreconditionError{
Err: fmt.Errorf(
"precondition failed: object has meta-generation %v",
existingMetaGen),
}
return
}
}
return
}
func createOrUpdateFakeObject(b *bucket, req *gcs.CreateObjectRequest, contents []byte) (o *gcs.Object, err error) {
// Create an object record from the given attributes.
var fo fakeObject = b.mintObject(req, contents)
o = copyObject(&fo.metadata)
// Replace an entry in or add an entry to our list of objects.
existingIndex := b.objects.find(req.Name)
if existingIndex < len(b.objects) {
b.objects[existingIndex] = fo
} else {
b.objects = append(b.objects, fo)
sort.Sort(b.objects)
}
if b.BucketType().Hierarchical {
b.addFolderEntry(req.Name)
}
return
}
// LOCKS_REQUIRED(b.mu)
func (b *bucket) createObjectLocked(
req *gcs.CreateObjectRequest) (o *gcs.Object, err error) {
// Check that the name is legal.
err = checkName(req.Name)
if err != nil {
return
}
contents, err := io.ReadAll(req.Contents)
if err != nil {
err = fmt.Errorf("ReadAll: %v", err)
return
}
err = preconditionChecks(b, req, contents)
if err != nil {
return nil, err
}
return createOrUpdateFakeObject(b, req, contents)
}
// Create a reader based on the supplied request, also returning the index
// within b.objects of the entry for the requested generation.
//
// LOCKS_REQUIRED(b.mu)
func (b *bucket) newReaderLocked(
req *gcs.ReadObjectRequest) (r io.Reader, index int, err error) {
// Find the object with the requested name.
index = b.objects.find(req.Name)
if index == len(b.objects) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %s not found", req.Name),
}
return
}
o := b.objects[index]
// Does the generation match?
if req.Generation != 0 && req.Generation != o.metadata.Generation {
err = &gcs.NotFoundError{
Err: fmt.Errorf(
"object %s generation %v not found", req.Name, req.Generation),
}
return
}
// Extract the requested range.
result := o.data
if req.Range != nil {
start := req.Range.Start
limit := req.Range.Limit
l := uint64(len(result))
if start > limit {
start = 0
limit = 0
}
if start > l {
start = 0
limit = 0
}
if limit > l {
limit = l
}
result = result[start:limit]
}
r = bytes.NewReader(result)
return
}
func minInt(a, b int) int {
if a < b {
return a
}
return b
}
func copyMetadata(in map[string]string) (out map[string]string) {
if in == nil {
return
}
out = make(map[string]string)
for k, v := range in {
out[k] = v
}
return
}
func copyObject(o *gcs.Object) *gcs.Object {
var copy gcs.Object = *o
copy.Metadata = copyMetadata(o.Metadata)
return ©
}
func copyMinObject(o *gcs.Object) *gcs.MinObject {
var copy gcs.MinObject
copy.Name = o.Name
copy.Size = o.Size
copy.Generation = o.Generation
copy.MetaGeneration = o.MetaGeneration
copy.Updated = o.Updated
copy.Metadata = copyMetadata(o.Metadata)
copy.ContentEncoding = o.ContentEncoding
copy.CRC32C = o.CRC32C
return ©
}
////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
func (b *bucket) Name() string {
return b.name
}
func (b *bucket) BucketType() gcs.BucketType {
return b.bucketType
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) ListObjects(
ctx context.Context,
req *gcs.ListObjectsRequest) (listing *gcs.Listing, err error) {
b.mu.Lock()
defer b.mu.Unlock()
// Set up the result object.
listing = new(gcs.Listing)
// Handle defaults.
maxResults := req.MaxResults
if maxResults == 0 {
maxResults = 1000
}
// Find where in the space of object names to start.
nameStart := req.Prefix
if req.ContinuationToken != "" && req.ContinuationToken > nameStart {
nameStart = req.ContinuationToken
}
// Find the range of indexes within the array to scan.
indexStart := b.objects.lowerBound(nameStart)
prefixLimit := b.objects.prefixUpperBound(req.Prefix)
indexLimit := minInt(indexStart+maxResults, prefixLimit)
// Scan the array.
var lastResultWasPrefix bool
for i := indexStart; i < indexLimit; i++ {
var o fakeObject = b.objects[i]
name := o.metadata.Name
// Search for a delimiter if necessary.
if req.Delimiter != "" {
// Search only in the part after the prefix.
nameMinusQueryPrefix := name[len(req.Prefix):]
delimiterIndex := strings.Index(nameMinusQueryPrefix, req.Delimiter)
if delimiterIndex >= 0 {
resultPrefixLimit := delimiterIndex
// Transform to an index within name.
resultPrefixLimit += len(req.Prefix)
// Include the delimiter in the result.
resultPrefixLimit += len(req.Delimiter)
// Save the result, but only if it's not a duplicate.
resultPrefix := name[:resultPrefixLimit]
if len(listing.CollapsedRuns) == 0 ||
listing.CollapsedRuns[len(listing.CollapsedRuns)-1] != resultPrefix {
listing.CollapsedRuns = append(listing.CollapsedRuns, resultPrefix)
}
// In hierarchical buckets, a directory is represented both as a prefix and a folder.
// Consequently, if a folder entry is discovered, it indicates that it's exclusively a prefix entry.
//
// This check was incorporated because createFolder needs to add an entry to the objects, and
// we cannot distinguish from that entry whether it's solely a prefix.
//
// For example, mkdir test will create both a folder entry and a test/ prefix.
// In our createFolder fake bucket implementation, we created both a folder and an object for
// the given folderName. There, we can't define whether it's only a prefix and not an object.
// Hence, we added this check here.
//
// Note that in a real ListObject call, the entry will appear only as a prefix and not as an object.
folderIndex := b.folders.find(resultPrefix)
if folderIndex < len(b.folders) {
lastResultWasPrefix = true
continue
}
isTrailingDelimiter := (delimiterIndex == len(nameMinusQueryPrefix)-1)
if !isTrailingDelimiter || !req.IncludeTrailingDelimiter {
lastResultWasPrefix = true
continue
}
}
}
lastResultWasPrefix = false
// Otherwise, return as an object result. Make a copy to avoid handing back
// internal state.
listing.MinObjects = append(listing.MinObjects, copyMinObject(&o.metadata))
}
// Set up a cursor for where to start the next scan if we didn't exhaust the
// results.
if indexLimit < prefixLimit {
// If the final object we visited was returned as an element in
// listing.CollapsedRuns, we want to skip all other objects that would
// result in the same so we don't return duplicate elements in
// listing.CollapsedRuns across requests.
if lastResultWasPrefix {
lastResultPrefix := listing.CollapsedRuns[len(listing.CollapsedRuns)-1]
listing.ContinuationToken = prefixSuccessor(lastResultPrefix)
// Check an assumption: prefixSuccessor cannot result in the empty string
// above because object names must be non-empty UTF-8 strings, and there
// is no valid non-empty UTF-8 string that consists of entirely 0xff
// bytes.
if listing.ContinuationToken == "" {
err = errors.New("unexpected empty string from prefixSuccessor")
return
}
} else {
// Otherwise, we'll start scanning at the next object.
listing.ContinuationToken = b.objects[indexLimit].metadata.Name
}
}
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) NewReaderWithReadHandle(ctx context.Context, req *gcs.ReadObjectRequest) (rd gcs.StorageReader, err error) {
b.mu.Lock()
defer b.mu.Unlock()
r, _, err := b.newReaderLocked(req)
if err != nil {
return
}
rc := io.NopCloser(r)
rd = &FakeReader{
ReadCloser: rc,
Handle: []byte("opaque-handle"),
}
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) CreateObject(
ctx context.Context,
req *gcs.CreateObjectRequest) (o *gcs.Object, err error) {
b.mu.Lock()
defer b.mu.Unlock()
o, err = b.createObjectLocked(req)
return
}
func (b *bucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObjectRequest, _ int, _ func(bytesUploadedSoFar int64)) (gcs.Writer, error) {
return NewFakeObjectWriter(b, req)
}
func (b *bucket) FlushPendingWrites(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
b.mu.Lock()
defer b.mu.Unlock()
offset, err := w.Flush()
if err != nil {
return nil, err
}
fakeObjectWriter, ok := w.(*FakeObjectWriter)
if !ok {
return nil, fmt.Errorf("could not type assert gcs.Writer to FakeObjectWriter")
}
fakeObjectWriter.Object.Size = uint64(offset)
return fakeObjectWriter.Object, nil
}
func (b *bucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
b.mu.Lock()
defer b.mu.Unlock()
err := w.Close()
if err != nil {
return nil, err
}
fakeObjectWriter, ok := w.(*FakeObjectWriter)
if !ok {
return nil, fmt.Errorf("could not type assert gcs.Writer to FakeObjectWriter")
}
return fakeObjectWriter.Object, nil
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) CopyObject(
ctx context.Context,
req *gcs.CopyObjectRequest) (o *gcs.Object, err error) {
b.mu.Lock()
defer b.mu.Unlock()
// Check that the destination name is legal.
err = checkName(req.DstName)
if err != nil {
return
}
// Does the object exist?
srcIndex := b.objects.find(req.SrcName)
if srcIndex == len(b.objects) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %q not found", req.SrcName),
}
return
}
// Does it have the correct generation?
if req.SrcGeneration != 0 &&
b.objects[srcIndex].metadata.Generation != req.SrcGeneration {
err = &gcs.NotFoundError{
Err: fmt.Errorf(
"object %s generation %d not found", req.SrcName, req.SrcGeneration),
}
return
}
// Does it have the correct meta-generation?
if req.SrcMetaGenerationPrecondition != nil {
p := *req.SrcMetaGenerationPrecondition
if b.objects[srcIndex].metadata.MetaGeneration != p {
err = &gcs.PreconditionError{
Err: fmt.Errorf(
"object %q has meta-generation %d",
req.SrcName,
b.objects[srcIndex].metadata.MetaGeneration),
}
return
}
}
// Copy it and assign a new generation number, to ensure that the generation
// number for the destination name is strictly increasing.
dst := b.objects[srcIndex]
dst.metadata.Name = req.DstName
dst.metadata.MediaLink = "http://localhost/download/storage/fake/" + req.DstName
b.prevGeneration++
dst.metadata.Generation = b.prevGeneration
// Insert into our array.
existingIndex := b.objects.find(req.DstName)
if existingIndex < len(b.objects) {
b.objects[existingIndex] = dst
} else {
b.objects = append(b.objects, dst)
sort.Sort(b.objects)
}
o = copyObject(&dst.metadata)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) ComposeObjects(
ctx context.Context,
req *gcs.ComposeObjectsRequest) (o *gcs.Object, err error) {
b.mu.Lock()
defer b.mu.Unlock()
// GCS doesn't like too few or too many sources.
if len(req.Sources) < 1 {
err = errors.New("you must provide at least one source component")
return
}
if len(req.Sources) > gcs.MaxSourcesPerComposeRequest {
err = errors.New("you have provided too many source components")
return
}
// Find readers for all of the source objects, also computing the sum of
// their component counts.
var srcReaders []io.Reader
var dstComponentCount int64
for _, src := range req.Sources {
var r io.Reader
var srcIndex int
r, srcIndex, err = b.newReaderLocked(&gcs.ReadObjectRequest{
Name: src.Name,
Generation: src.Generation,
})
if err != nil {
return
}
srcReaders = append(srcReaders, r)
dstComponentCount += b.objects[srcIndex].metadata.ComponentCount
}
// GCS doesn't like the component count to go too high.
if dstComponentCount > gcs.MaxComponentCount {
err = errors.New("result would have too many components")
return
}
// Create the new object.
createReq := &gcs.CreateObjectRequest{
Name: req.DstName,
GenerationPrecondition: req.DstGenerationPrecondition,
MetaGenerationPrecondition: req.DstMetaGenerationPrecondition,
Contents: io.MultiReader(srcReaders...),
ContentType: req.ContentType,
Metadata: req.Metadata,
}
_, err = b.createObjectLocked(createReq)
if err != nil {
return
}
dstIndex := b.objects.find(req.DstName)
metadata := &b.objects[dstIndex].metadata
// Touchup: fix the component count.
metadata.ComponentCount = dstComponentCount
// Touchup: emulate the real GCS behavior of not exporting an MD5 hash for
// composite objects.
metadata.MD5 = nil
o = copyObject(metadata)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) StatObject(ctx context.Context,
req *gcs.StatObjectRequest) (m *gcs.MinObject, e *gcs.ExtendedObjectAttributes, err error) {
// If ExtendedObjectAttributes are requested without fetching from gcs enabled, panic.
if !req.ForceFetchFromGcs && req.ReturnExtendedObjectAttributes {
panic("invalid StatObjectRequest: ForceFetchFromGcs: false and ReturnExtendedObjectAttributes: true")
}
b.mu.Lock()
defer b.mu.Unlock()
// Does the object exist?
index := b.objects.find(req.Name)
if index == len(b.objects) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %s not found", req.Name),
}
return
}
// Make a copy to avoid handing back internal state.
o := copyObject(&b.objects[index].metadata)
m = storageutil.ConvertObjToMinObject(o)
if req.ReturnExtendedObjectAttributes {
e = storageutil.ConvertObjToExtendedObjectAttributes(o)
}
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) UpdateObject(
ctx context.Context,
req *gcs.UpdateObjectRequest) (o *gcs.Object, err error) {
b.mu.Lock()
defer b.mu.Unlock()
// Does the object exist?
index := b.objects.find(req.Name)
if index == len(b.objects) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %s not found", req.Name),
}
return
}
var obj *gcs.Object = &b.objects[index].metadata
// Does the generation number match the request?
if req.Generation != 0 && obj.Generation != req.Generation {
err = &gcs.NotFoundError{
Err: fmt.Errorf(
"object %q generation %d not found",
req.Name,
req.Generation),
}
return
}
// Does the meta-generation precondition check out?
if req.MetaGenerationPrecondition != nil &&
obj.MetaGeneration != *req.MetaGenerationPrecondition {
err = &gcs.PreconditionError{
Err: fmt.Errorf(
"object %q has meta-generation %d",
obj.Name,
obj.MetaGeneration),
}
return
}
// Update the entry's basic fields according to the request.
if req.ContentType != nil {
obj.ContentType = *req.ContentType
}
if req.ContentEncoding != nil {
obj.ContentEncoding = *req.ContentEncoding
}
if req.ContentLanguage != nil {
obj.ContentLanguage = *req.ContentLanguage
}
if req.CacheControl != nil {
obj.CacheControl = *req.CacheControl
}
// Update the user metadata if necessary.
if len(req.Metadata) > 0 {
if obj.Metadata == nil {
obj.Metadata = make(map[string]string)
}
for k, v := range req.Metadata {
if v == nil {
delete(obj.Metadata, k)
continue
}
obj.Metadata[k] = *v
}
}
// Bump up the entry generation number and the update time.
obj.MetaGeneration++
obj.Updated = b.clock.Now()
// Make a copy to avoid handing back internal state.
o = copyObject(obj)
return
}
// LOCKS_EXCLUDED(b.mu)
func (b *bucket) DeleteObject(
ctx context.Context,
req *gcs.DeleteObjectRequest) (err error) {
b.mu.Lock()
defer b.mu.Unlock()
// Do we possess the object with the given name?
index := b.objects.find(req.Name)
if index == len(b.objects) {
return
}
// Don't do anything if the generation is wrong.
if req.Generation != 0 &&
b.objects[index].metadata.Generation != req.Generation {
return
}
// Check the meta-generation if requested.
if req.MetaGenerationPrecondition != nil {
p := *req.MetaGenerationPrecondition
if b.objects[index].metadata.MetaGeneration != p {
err = &gcs.PreconditionError{
Err: fmt.Errorf(
"object %q has meta-generation %d",
req.Name,
b.objects[index].metadata.MetaGeneration),
}
return
}
}
// Remove the object.
b.objects = append(b.objects[:index], b.objects[index+1:]...)
return
}
func (b *bucket) MoveObject(ctx context.Context, req *gcs.MoveObjectRequest) (*gcs.Object, error) {
// Check that the destination name is legal.
err := checkName(req.DstName)
if err != nil {
return nil, err
}
// Does the source object exist?
srcIndex := b.objects.find(req.SrcName)
if srcIndex == len(b.objects) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %q not found", req.SrcName),
}
return nil, err
}
// Does it have the correct generation?
if req.SrcGeneration != 0 &&
b.objects[srcIndex].metadata.Generation != req.SrcGeneration {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %s generation %d not found", req.SrcName, req.SrcGeneration),
}
return nil, err
}
// Does it have the correct meta-generation?
if req.SrcMetaGenerationPrecondition != nil {
p := *req.SrcMetaGenerationPrecondition
if b.objects[srcIndex].metadata.MetaGeneration != p {
err = &gcs.PreconditionError{
Err: fmt.Errorf("object %q has meta-generation %d", req.SrcName, b.objects[srcIndex].metadata.MetaGeneration),
}
return nil, err
}
}
// Move it and assign a new generation number, to ensure that the generation
// number for the destination name is strictly increasing.
dst := b.objects[srcIndex]
dst.metadata.Name = req.DstName
dst.metadata.MediaLink = "http://localhost/download/storage/fake/" + req.DstName
b.prevGeneration++
dst.metadata.Generation = b.prevGeneration
// Remove the source object.
b.objects = append(b.objects[:srcIndex], b.objects[srcIndex+1:]...)
// Insert dest object into our array.
existingIndex := b.objects.find(req.DstName)
if existingIndex < len(b.objects) {
b.objects[existingIndex] = dst
} else {
b.objects = append(b.objects, dst)
sort.Sort(b.objects)
}
o := copyObject(&dst.metadata)
return o, err
}
func (b *bucket) DeleteFolder(ctx context.Context, folderName string) (err error) {
b.mu.Lock()
defer b.mu.Unlock()
// Do we possess the folder with the given name?
index := b.folders.find(folderName)
if index == len(b.folders) {
return
}
// Remove the folder.
b.folders = append(b.folders[:index], b.folders[index+1:]...)
// In the hierarchical bucket, control client API deletes folders as well as
// prefixes for backward compatibility. Therefore, a prefix object
// entry needs to be deleted here as well.
// Do we possess the prefix object with the given name?
index = b.objects.find(folderName)
if index == len(b.objects) {
return
}
// Remove the prefix object.
b.objects = append(b.objects[:index], b.objects[index+1:]...)
return
}
func (b *bucket) GetFolder(ctx context.Context, foldername string) (*gcs.Folder, error) {
b.mu.Lock()
defer b.mu.Unlock()
// Does the folder exist?
index := b.folders.find(foldername)
if index == len(b.folders) {
err := &gcs.NotFoundError{
Err: fmt.Errorf("object %s not found", foldername),
}
return nil, err
}
return &gcs.Folder{Name: foldername}, nil
}
func (b *bucket) CreateFolder(ctx context.Context, folderName string) (*gcs.Folder, error) {
b.mu.Lock()
defer b.mu.Unlock()
// Check that the name is legal.
err := checkName(folderName)
if err != nil {
return nil, err
}
// Find any existing record for this name.
existingIndex := b.folders.find(folderName)
// Create a folder record from the given attributes.
fo := b.mintFolder(folderName)
// Replace an entry in or add an entry to our list of folders.
if existingIndex < len(b.folders) {
b.folders[existingIndex] = fo
} else {
b.folders = append(b.folders, fo)
sort.Sort(b.folders)
}
// In the hierarchical bucket, control client API creates folders as well as
// prefixes for backward compatibility. Therefore, a prefix object
// entry needs to be created here as well.
// Find any existing record for this name.
existingObjectPrefixIndex := b.objects.find(folderName)
// Create a prefix object record from the given attributes.
var prefixObject fakeObject
prefixObject.metadata = gcs.Object{Name: folderName}
// Replace an entry in or add an entry to our list of objects.
if existingObjectPrefixIndex < len(b.objects) {
b.objects[existingObjectPrefixIndex] = prefixObject
} else {
b.objects = append(b.objects, prefixObject)
sort.Sort(b.objects)
}
return &fo, nil
}
func (b *bucket) RenameFolder(ctx context.Context, folderName string, destinationFolderId string) (*gcs.Folder, error) {
// Check that the destination name is legal.
err := checkName(destinationFolderId)
if err != nil {
return nil, err
}
// Check if the source folder exists.
srcIndex := b.folders.find(folderName)
if srcIndex == len(b.folders) {
err = &gcs.NotFoundError{
Err: fmt.Errorf("object %q not found", folderName),
}
return nil, err
}
// Find all folders starting with the given prefix and update their names.
for i := range b.folders {
if strings.HasPrefix(b.folders[i].Name, folderName) {
b.folders[i].Name = strings.Replace(b.folders[i].Name, folderName, destinationFolderId, 1)
b.folders[i].UpdateTime = time.Now()
}
}
// Sort the updated folders.
sort.Sort(b.folders)
// Find all objects starting with the given prefix and update their names.
for i := range b.objects {
if strings.HasPrefix(b.objects[i].metadata.Name, folderName) {
b.objects[i].metadata.Name = strings.Replace(b.objects[i].metadata.Name, folderName, destinationFolderId, 1)
b.objects[i].metadata.Updated = time.Now()
}
}
// Sort the updated objects.
sort.Sort(b.objects)
// Return the updated folder.
folder := &gcs.Folder{
Name: destinationFolderId,
UpdateTime: time.Now(),
}
return folder, nil
}
func (b *bucket) NewMultiRangeDownloader(
ctx context.Context, req *gcs.MultiRangeDownloaderRequest) (gcs.MultiRangeDownloader, error) {
index := b.objects.find(req.Name)
if index >= len(b.objects) || index < 0 {
return nil, &gcs.NotFoundError{Err: fmt.Errorf("not found object %s in fake-bucket", req.Name)}
}
obj := b.objects[index]
if req.Generation != 0 && obj.metadata.Generation != req.Generation {
return nil, &gcs.NotFoundError{Err: fmt.Errorf("not found object %s in fake-bucket with generation %v", req.Name, req.Generation)}
}
if obj.data == nil {
return nil, fmt.Errorf("found no content for object %s in fake-bucket", req.Name)
}
return &fakeMultiRangeDownloader{obj: &obj}, nil
}