func()

in step_copy_gcs_objects.go [150:203]


func (c *CopyGCSObjects) run(ctx context.Context, s *Step) DError {
	var wg sync.WaitGroup
	w := s.w
	e := make(chan DError)
	for _, co := range *c {
		wg.Add(1)
		go func(co CopyGCSObject) {
			defer wg.Done()
			sBkt, sObj, err := splitGCSPath(co.Source)
			if err != nil {
				e <- err
				return
			}
			dBkt, dObj, err := splitGCSPath(co.Destination)
			if err != nil {
				e <- err
				return
			}

			if sObj == "" || strings.HasSuffix(sObj, "/") {
				if err := recursiveGCS(ctx, s.w, sBkt, sObj, dBkt, dObj, co.ACLRules); err != nil {
					e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err)
					return
				}
				return
			}

			src := s.w.StorageClient.Bucket(sBkt).Object(sObj)
			dstPath := s.w.StorageClient.Bucket(dBkt).Object(dObj)
			if _, err := dstPath.CopierFrom(src).Run(ctx); err != nil {
				e <- Errf("error copying from %s to %s: %v", co.Source, co.Destination, err)
				return
			}
			for _, acl := range co.ACLRules {
				if err := dstPath.ACL().Set(ctx, acl.Entity, acl.Role); err != nil {
					e <- Errf("error setting ACLRule on %s: %v", co.Destination, err)
					return
				}
			}
		}(co)
	}

	go func() {
		wg.Wait()
		e <- nil
	}()

	select {
	case err := <-e:
		return err
	case <-w.Cancel:
		return nil
	}
}