util/gcs/fake/fake.go (305 lines of code) (raw):

/* Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package fake import ( "bytes" "context" "errors" "fmt" "io" "net/http" "sync" "cloud.google.com/go/storage" "github.com/GoogleCloudPlatform/testgrid/util/gcs" "google.golang.org/api/googleapi" "google.golang.org/api/iterator" ) // ConditionalClient is a fake conditional client that can limit actions to matching conditions. type ConditionalClient struct { UploadClient read, write *storage.Conditions Lock *sync.RWMutex } func (cc *ConditionalClient) check(ctx context.Context, from, to *gcs.Path) error { if from != nil && cc.read != nil { attrs, err := cc.UploadClient.Stat(ctx, *from) switch { case err != nil: return err case cc.read.GenerationMatch != 0 && cc.read.GenerationMatch != attrs.Generation: return fmt.Errorf("bad genneration due to GenerationMatch: %w", &googleapi.Error{ Code: http.StatusPreconditionFailed, }) case cc.read.GenerationNotMatch != 0 && cc.read.GenerationNotMatch == attrs.Generation: return fmt.Errorf("bad generation due to GenerationNotMatch: %w", &googleapi.Error{ Code: http.StatusPreconditionFailed, }) } } if to != nil && cc.write != nil { attrs, err := cc.UploadClient.Stat(ctx, *to) switch { case err == storage.ErrObjectNotExist: if cc.write.GenerationMatch != 0 { return fmt.Errorf("bad generation: %w", &googleapi.Error{ Code: http.StatusPreconditionFailed, }) } case err != nil: return err case cc.write.GenerationMatch != 0 && cc.write.GenerationMatch != attrs.Generation: return fmt.Errorf("bad generation due to GenerationMatch: %w", &googleapi.Error{ Code: http.StatusPreconditionFailed, }) case cc.write.GenerationNotMatch != 0 && cc.write.GenerationNotMatch == attrs.Generation: return fmt.Errorf("bad generation due to GenerationNotMatch: %w", &googleapi.Error{ Code: http.StatusPreconditionFailed, }) } } return nil } // Copy copies the contents of 'from' into 'to'. func (cc *ConditionalClient) Copy(ctx context.Context, from, to gcs.Path) (*storage.ObjectAttrs, error) { if cc.Lock != nil { cc.Lock.Lock() defer cc.Lock.Unlock() } if err := cc.check(ctx, &from, &to); err != nil { return nil, err } gen := cc.Uploader[to].Generation + 1 if _, err := cc.UploadClient.Copy(ctx, from, to); err != nil { return nil, err } u := cc.Uploader[to] u.Generation = gen cc.Uploader[to] = u return u.Attrs(to), nil } // Upload writes content to the given path. func (cc *ConditionalClient) Upload(ctx context.Context, path gcs.Path, buf []byte, worldRead bool, cache string) (*storage.ObjectAttrs, error) { if cc.Lock != nil { cc.Lock.Lock() defer cc.Lock.Unlock() } if err := cc.check(ctx, nil, &path); err != nil { return nil, err } gen := cc.Uploader[path].Generation + 1 _, err := cc.UploadClient.Upload(ctx, path, buf, worldRead, cache) if err != nil { return nil, err } u := cc.Uploader[path] u.Generation = gen cc.Uploader[path] = u return u.Attrs(path), nil } // If returns a fake conditional client. func (cc *ConditionalClient) If(read, write *storage.Conditions) gcs.ConditionalClient { return &ConditionalClient{ UploadClient: cc.UploadClient, read: read, write: write, Lock: cc.Lock, } } // Open the path conditionally. func (cc *ConditionalClient) Open(ctx context.Context, path gcs.Path) (io.ReadCloser, *storage.ReaderObjectAttrs, error) { if cc.Lock != nil { cc.Lock.RLock() defer cc.Lock.RUnlock() } if err := cc.check(ctx, &path, nil); err != nil { return nil, nil, err } return cc.UploadClient.Open(ctx, path) } // Objects in the path. func (cc *ConditionalClient) Objects(ctx context.Context, path gcs.Path, _, offset string) gcs.Iterator { if cc.Lock != nil { cc.Lock.RLock() defer cc.Lock.RUnlock() } return cc.UploadClient.Objects(ctx, path, "", offset) } // Stat about the path, such as size, generation, etc. func (cc *ConditionalClient) Stat(ctx context.Context, path gcs.Path) (*storage.ObjectAttrs, error) { if cc.Lock != nil { cc.Lock.RLock() defer cc.Lock.RUnlock() } if err := cc.check(ctx, &path, nil); err != nil { return nil, err } return cc.UploadClient.Stat(ctx, path) } // UploadClient is a fake upload client type UploadClient struct { Client Uploader Stater } // If returns a fake upload client. func (fuc UploadClient) If(read, write *storage.Conditions) gcs.ConditionalClient { return fuc } // Stat contains object attributes for a given path. type Stat struct { Err error Attrs storage.ObjectAttrs } // Stater stats given paths. type Stater map[gcs.Path]Stat // Stat returns object attributes for a given path. func (fs Stater) Stat(ctx context.Context, path gcs.Path) (*storage.ObjectAttrs, error) { if err := ctx.Err(); err != nil { return nil, fmt.Errorf("injected interrupt: %w", err) } ret, ok := fs[path] if !ok { return nil, storage.ErrObjectNotExist } if ret.Err != nil { return nil, fmt.Errorf("injected upload error: %w", ret.Err) } return &ret.Attrs, nil } // Uploader adds upload capabilities to a fake client. type Uploader map[gcs.Path]Upload // Copy an object to the specified path func (fu Uploader) Copy(ctx context.Context, from, to gcs.Path) (*storage.ObjectAttrs, error) { if err := ctx.Err(); err != nil { return nil, fmt.Errorf("injected interrupt: %w", err) } u, present := fu[from] if !present { return nil, storage.ErrObjectNotExist } if err := u.Err; err != nil { return nil, fmt.Errorf("injected from error: %w", err) } u.Generation++ fu[to] = u return u.Attrs(to), nil } // Upload writes content to the given path. func (fu Uploader) Upload(ctx context.Context, path gcs.Path, buf []byte, worldRead bool, cacheControl string) (*storage.ObjectAttrs, error) { if err := ctx.Err(); err != nil { return nil, fmt.Errorf("injected interrupt: %w", err) } if err := fu[path].Err; err != nil { return nil, fmt.Errorf("injected upload error: %w", err) } u := Upload{ Buf: buf, CacheControl: cacheControl, WorldRead: worldRead, } fu[path] = u return u.Attrs(path), nil } // Upload represents an upload. type Upload struct { Buf []byte CacheControl string WorldRead bool Err error Generation int64 } // Attrs returns file attributes. func (u Upload) Attrs(path gcs.Path) *storage.ObjectAttrs { return &storage.ObjectAttrs{ Bucket: path.Bucket(), Name: path.Object(), CacheControl: u.CacheControl, Generation: u.Generation, } } // Opener opens given paths. type Opener struct { Paths map[gcs.Path]Object Lock *sync.RWMutex } // Open returns a handle for a given path. func (fo Opener) Open(ctx context.Context, path gcs.Path) (io.ReadCloser, *storage.ReaderObjectAttrs, error) { if fo.Lock != nil { fo.Lock.Lock() defer fo.Lock.Unlock() } o, ok := fo.Paths[path] if !ok { return nil, nil, storage.ErrObjectNotExist } if o.OpenErr != nil { // Only error is OpenErr is specified. if !o.OpenOnRetry { return nil, nil, o.OpenErr } // If retry is also specified, only error the first time. if !o.OpenHasErred { o.OpenHasErred = true fo.Paths[path] = o return nil, nil, o.OpenErr } // else o.OpenOnRetry + o.OpenHasErred, so continue. } return &Reader{ Buf: bytes.NewBufferString(o.Data), ReadErr: o.ReadErr, CloseErr: o.CloseErr, }, o.Attrs, nil } // Object holds data for an object. type Object struct { Data string Attrs *storage.ReaderObjectAttrs OpenOnRetry bool // If true and OpenErr != nil, only error the first time Open() is called. OpenHasErred bool OpenErr error ReadErr error CloseErr error } // A Reader reads a file. type Reader struct { Buf *bytes.Buffer ReadErr error CloseErr error } // Read reads a file's contents. func (fr *Reader) Read(p []byte) (int, error) { if fr.ReadErr != nil { return 0, fr.ReadErr } return fr.Buf.Read(p) } // Close closes a file. func (fr *Reader) Close() error { if fr.CloseErr != nil { return fr.CloseErr } fr.ReadErr = errors.New("already closed") fr.CloseErr = fr.ReadErr return nil } // A Lister returns objects under a prefix. type Lister map[gcs.Path]Iterator // Objects returns an iterator of objects under a given path. func (fl Lister) Objects(ctx context.Context, path gcs.Path, _, offset string) gcs.Iterator { f := fl[path] f.ctx = ctx return &f } // An Iterator returns the attributes of the listed objects or an iterator.Done error. type Iterator struct { Objects []storage.ObjectAttrs Idx int Err int // must be > 0 ctx context.Context Offset string ErrOpen error } // A Client can list files and open them for reading. type Client struct { Lister Opener } // Next returns the next value. func (fi *Iterator) Next() (*storage.ObjectAttrs, error) { if fi.ctx.Err() != nil { return nil, fi.ctx.Err() } if fi.ErrOpen != nil { return nil, fi.ErrOpen } for fi.Idx < len(fi.Objects) { if fi.Offset == "" { break } name, prefix := fi.Objects[fi.Idx].Name, fi.Objects[fi.Idx].Prefix if name != "" && name < fi.Offset { continue } if prefix != "" && prefix < fi.Offset { continue } fi.Idx++ } if fi.Idx >= len(fi.Objects) { return nil, iterator.Done } if fi.Idx > 0 && fi.Idx == fi.Err { return nil, errors.New("injected Iterator error") } o := fi.Objects[fi.Idx] fi.Idx++ return &o, nil }