in step_copy_gcs_objects.go [150:203]
func (c *CopyGCSObjects) run(ctx context.Context, s *Step) DError {
var wg sync.WaitGroup
w := s.w
e := make(chan DError)
for _, co := range *c {
wg.Add(1)
go func(co CopyGCSObject) {
defer wg.Done()
sBkt, sObj, err := splitGCSPath(co.Source)
if err != nil {
e <- err
return
}
dBkt, dObj, err := splitGCSPath(co.Destination)
if err != nil {
e <- err
return
}
if sObj == "" || strings.HasSuffix(sObj, "/") {
if err := recursiveGCS(ctx, s.w, sBkt, sObj, dBkt, dObj, co.ACLRules); err != nil {
e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err)
return
}
return
}
src := s.w.StorageClient.Bucket(sBkt).Object(sObj)
dstPath := s.w.StorageClient.Bucket(dBkt).Object(dObj)
if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil {
e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err)
return
}
for _, acl := range co.ACLRules {
if err := dstPath.ACL().Set(ctx, acl.Entity, acl.Role); err != nil {
e <- Errf("error setting ACLRule on %s: %v", co.Destination, err)
return
}
}
}(co)
}
go func() {
wg.Wait()
e <- nil
}()
select {
case err := <-e:
return err
case <-w.Cancel:
return nil
}
}