step_copy_gcs_objects.go (160 lines of code) (raw):

// Copyright 2017 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package daisy import ( "context" "fmt" "path" "strings" "sync" "cloud.google.com/go/storage" "google.golang.org/api/iterator" ) // CopyGCSObjects is a Daisy CopyGCSObject workflow step. type CopyGCSObjects []CopyGCSObject // CopyGCSObject copies a GCS object from Source to Destination. type CopyGCSObject struct { Source, Destination string ACLRules []*storage.ACLRule `json:",omitempty"` } func (c *CopyGCSObjects) populate(ctx context.Context, s *Step) DError { for _, co := range *c { for _, acl := range co.ACLRules { acl.Role = storage.ACLRole(strings.ToUpper(string(acl.Role))) } } return nil } func (c *CopyGCSObjects) validate(ctx context.Context, s *Step) DError { for _, co := range *c { sBkt, _, err := splitGCSPath(co.Source) if err != nil { return err } dBkt, dObj, err := splitGCSPath(co.Destination) if err != nil { return err } // Add object to object list. if err := s.w.objects.regCreate(path.Join(dBkt, dObj)); err != nil { return err } // Check if source bucket exists and is readable. readableBkts.mx.Lock() if !strIn(sBkt, readableBkts.bkts) { if _, err := s.w.StorageClient.Bucket(sBkt).Attrs(ctx); err != nil { return Errf("error reading bucket %q: %v", sBkt, err) } readableBkts.bkts = append(readableBkts.bkts, sBkt) } readableBkts.mx.Unlock() // Check if destination bucket exists and is readable. writableBkts.mx.Lock() if !strIn(dBkt, writableBkts.bkts) { if _, err := s.w.StorageClient.Bucket(dBkt).Attrs(ctx); err != nil { return Errf("error reading bucket %q: %v", dBkt, err) } // Check if destination bucket is writable. tObj := s.w.StorageClient.Bucket(dBkt).Object(fmt.Sprintf("daisy-validate-%s-%s", s.name, s.w.id)) w := tObj.NewWriter(ctx) if _, err := w.Write(nil); err != nil { return newErr("failed to ", err) } if err := w.Close(); err != nil { return Errf("error writing to bucket %q: %v", dBkt, err) } if err := tObj.Delete(ctx); err != nil { return Errf("error deleting file %+v after write validation: %v", tObj, err) } writableBkts.bkts = append(writableBkts.bkts, dBkt) } writableBkts.mx.Unlock() // Check each ACLRule for _, acl := range co.ACLRules { if acl.Entity == "" { return Errf("ACLRule.Entity must not be empty: %+v", acl) } roles := []string{string(storage.RoleOwner), string(storage.RoleReader), string(storage.RoleWriter)} if !strIn(string(acl.Role), roles) { return Errf("ACLRule.Role invalid: %q not one of %q", acl.Role, roles) } // Test ACLRule.Entity. tObj := s.w.StorageClient.Bucket(dBkt).Object(fmt.Sprintf("daisy-validate-%s-%s", s.name, s.w.id)) w := tObj.NewWriter(ctx) if _, err := w.Write(nil); err != nil { return newErr("failed to write to GCS object when testing ACLRule.Entity", err) } if err := w.Close(); err != nil { return newErr("failed to close GCS object when testing ACLRule.Entity", err) } defer tObj.Delete(ctx) if err := tObj.ACL().Set(ctx, acl.Entity, acl.Role); err != nil { return Errf("error validating ACLRule %+v: %v", acl, err) } } } return nil } func recursiveGCS(ctx context.Context, w *Workflow, sBkt, sPrefix, dBkt, dPrefix string, acls []*storage.ACLRule) DError { it := w.StorageClient.Bucket(sBkt).Objects(ctx, &storage.Query{Prefix: sPrefix}) for objAttr, err := it.Next(); err != iterator.Done; objAttr, err = it.Next() { if err != nil { return typedErr(apiError, "failed to iterate GCS objects for copying", err) } if objAttr.Size == 0 { continue } srcPath := w.StorageClient.Bucket(sBkt).Object(objAttr.Name) o := path.Join(dPrefix, strings.TrimPrefix(objAttr.Name, sPrefix)) dstPath := w.StorageClient.Bucket(dBkt).Object(o) if _, err := dstPath.CopierFrom(srcPath).Run(ctx); err != nil { return typedErr(apiError, "failed to copy GCS object", err) } for _, acl := range acls { if err := dstPath.ACL().Set(ctx, acl.Entity, acl.Role); err != nil { return typedErr(apiError, "failed to set ACL for GCS object", err) } } } return nil } func (c *CopyGCSObjects) run(ctx context.Context, s *Step) DError { var wg sync.WaitGroup w := s.w e := make(chan DError) for _, co := range *c { wg.Add(1) go func(co CopyGCSObject) { defer wg.Done() sBkt, sObj, err := splitGCSPath(co.Source) if err != nil { e <- err return } dBkt, dObj, err := splitGCSPath(co.Destination) if err != nil { e <- err return } if sObj == "" || strings.HasSuffix(sObj, "/") { if err := recursiveGCS(ctx, s.w, sBkt, sObj, dBkt, dObj, co.ACLRules); err != nil { e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err) return } return } src := s.w.StorageClient.Bucket(sBkt).Object(sObj) dstPath := s.w.StorageClient.Bucket(dBkt).Object(dObj) if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil { e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err) return } for _, acl := range co.ACLRules { if err := dstPath.ACL().Set(ctx, acl.Entity, acl.Role); err != nil { e <- Errf("error setting ACLRule on %s: %v", co.Destination, err) return } } }(co) } go func() { wg.Wait() e <- nil }() select { case err := <-e: return err case <-w.Cancel: return nil } }